From 4d1fe1a9ac45ab39b6bad3516c66bcf295de23ec Mon Sep 17 00:00:00 2001 From: mprammer Date: Tue, 23 Jun 2026 22:24:50 -0400 Subject: [PATCH 1/4] java: add vortex-jni JMH read-boundary benchmark lane MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit New `vortex-jni-bench` module (JMH) that stresses the vortex-jni read boundary — JNI plus the Arrow C Data Interface — which is the path an Iceberg FormatModel takes to read Vortex from the JVM. Three query shapes (full scan, projection, selective filter) over a synthetic six-column table, consumed column-at-a-time so the numbers reflect format/boundary cost rather than per-row JVM allocation. Includes a batch-granularity diagnostic (Vortex coalesces to ~64K-row read batches regardless of write chunk) and a README with run instructions. Must run against a --release native lib (VORTEX_SKIP_MAKE_TEST_FILES=true to preserve it). v2 TODO: a native Rust criterion read of the same file as a floor, to quote boundary overhead vs native. Co-Authored-By: Claude Signed-off-by: mprammer --- java/settings.gradle.kts | 3 + java/vortex-jni-bench/README.md | 50 ++++ java/vortex-jni-bench/build.gradle.kts | 29 ++ .../vortex/bench/VortexJniReadBenchmark.java | 281 ++++++++++++++++++ 4 files changed, 363 insertions(+) create mode 100644 java/vortex-jni-bench/README.md create mode 100644 java/vortex-jni-bench/build.gradle.kts create mode 100644 java/vortex-jni-bench/src/jmh/java/dev/vortex/bench/VortexJniReadBenchmark.java diff --git a/java/settings.gradle.kts b/java/settings.gradle.kts index a601cfa9488..8cbf97feb04 100644 --- a/java/settings.gradle.kts +++ b/java/settings.gradle.kts @@ -19,6 +19,9 @@ rootProject.name = "vortex-root" // API bindings include("vortex-jni") + +// Benchmarks +include("vortex-jni-bench") include("vortex-spark_2.12") project(":vortex-spark_2.12").projectDir = file("vortex-spark") diff --git a/java/vortex-jni-bench/README.md b/java/vortex-jni-bench/README.md new file mode 100644 index 00000000000..ad065bf9b78 --- /dev/null +++ b/java/vortex-jni-bench/README.md @@ -0,0 +1,50 @@ +# vortex-jni-bench + +JMH microbenchmarks that stress the **vortex-jni read boundary** — JNI plus the Arrow C Data +Interface — which is the path an Iceberg `FormatModel` takes to read Vortex from the JVM. + +`VortexJniReadBenchmark` writes a synthetic six-column table (2M rows: 2× int64, 2× float64, +2× Utf8View) and reads it back three ways, consuming columns at the buffer level (numeric sums / +null counts) so the numbers reflect format + boundary cost, not per-row Java allocation: + +- `fullScan` — read all six columns. +- `projection` — read two of six (projection pushdown). +- `selectiveFilter` — `cat = 'alpha'` (~1/16 selectivity; filter pushdown). + +`ScanOptions` has no read-batch knob, and Vortex coalesces to ~64K-row read batches regardless of +the writer's chunk size, so the boundary is amortized over large batches by construction. Run the +`main` method to see that batch-granularity diagnostic. + +## Running + +The benchmark **must** run against a `--release` native lib (the dev `makeTestFiles` task builds a +debug lib, which would make the numbers meaningless). Build it once and drop it into vortex-jni's +resources, then run with `VORTEX_SKIP_MAKE_TEST_FILES=true` so the debug rebuild doesn't clobber it: + +```bash +# from repo root: build the release cdylib +cargo build --release -p vortex-jni +# place it for the host arch (example: macOS arm64) +cp target/release/libvortex_jni.dylib \ + java/vortex-jni/src/main/resources/native/darwin-aarch64/ + +cd java +VORTEX_SKIP_MAKE_TEST_FILES=true ./gradlew :vortex-jni-bench:jmh +``` + +Results land in `vortex-jni-bench/build/results/jmh/results.txt`. The JMH fork adds Arrow's +`--add-opens` flags via `@Fork(jvmArgsAppend=...)`. + +Batch-granularity diagnostic: + +```bash +VORTEX_SKIP_MAKE_TEST_FILES=true ./gradlew :vortex-jni-bench:jmhJar +java --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED \ + -cp vortex-jni-bench/build/libs/*-jmh.jar dev.vortex.bench.VortexJniReadBenchmark +``` + +## TODO (v2) + +These benchmarks measure absolute throughput and pushdown effectiveness *through* the boundary, not +the boundary's *overhead*. To quote a "().configureEach { enabled = false } + +dependencies { + jmhImplementation(platform(libs.netty.bom)) + jmhImplementation(project(":vortex-jni")) + jmhImplementation(libs.arrow.c.data) + jmhImplementation(libs.arrow.memory.core) + jmhImplementation(libs.arrow.memory.netty) +} + +jmh { + jmhVersion.set("1.37") +} diff --git a/java/vortex-jni-bench/src/jmh/java/dev/vortex/bench/VortexJniReadBenchmark.java b/java/vortex-jni-bench/src/jmh/java/dev/vortex/bench/VortexJniReadBenchmark.java new file mode 100644 index 00000000000..ab3b03ffd7d --- /dev/null +++ b/java/vortex-jni-bench/src/jmh/java/dev/vortex/bench/VortexJniReadBenchmark.java @@ -0,0 +1,281 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +package dev.vortex.bench; + +import static java.nio.charset.StandardCharsets.UTF_8; + +import dev.vortex.api.DataSource; +import dev.vortex.api.Expression; +import dev.vortex.api.Partition; +import dev.vortex.api.Scan; +import dev.vortex.api.ScanOptions; +import dev.vortex.api.Session; +import dev.vortex.api.VortexWriter; +import dev.vortex.jni.NativeLoader; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.HashMap; +import java.util.List; +import java.util.Random; +import java.util.concurrent.TimeUnit; +import org.apache.arrow.c.ArrowArray; +import org.apache.arrow.c.ArrowSchema; +import org.apache.arrow.c.Data; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.BigIntVector; +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.Float8Vector; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.ViewVarCharVector; +import org.apache.arrow.vector.ipc.ArrowReader; +import org.apache.arrow.vector.types.FloatingPointPrecision; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.Schema; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +/** + * Measures read throughput through the vortex-jni boundary (JNI + the Arrow C Data Interface). + * + *

Three query shapes — full scan, projection, and a selective filter — over a synthetic six-column table. Rows are + * consumed column-at-a-time (numeric sums, null counts) rather than into per-row Java objects, so the numbers reflect + * the format/boundary cost rather than JVM allocation. + * + *

Note on batch size: {@code ScanOptions} exposes no read-batch knob, and Vortex coalesces to ~64K-row read batches + * regardless of the writer's chunk size (see {@link #main}), so the boundary cost is already amortized over large + * batches by construction — there is no small-batch regime to sweep from the public API. + */ +@BenchmarkMode(Mode.Throughput) +@OutputTimeUnit(TimeUnit.SECONDS) +@Warmup(iterations = 3, time = 2) +@Measurement(iterations = 5, time = 2) +@Fork( + value = 1, + jvmArgsAppend = { + "--add-opens=java.base/java.nio=ALL-UNNAMED", + "--add-opens=java.base/sun.nio.ch=ALL-UNNAMED" + }) +@State(Scope.Benchmark) +public class VortexJniReadBenchmark { + + static final long ROWS = 2_000_000L; + static final int WRITE_CHUNK = 65536; + static final String[] CATS = { + "alpha", "bravo", "charlie", "delta", "echo", "foxtrot", "golf", "hotel", + "india", "juliet", "kilo", "lima", "mike", "november", "oscar", "papa" + }; + + BufferAllocator allocator; + Session session; + DataSource dataSource; + Path file; + + @Setup(Level.Trial) + public void setup() throws Exception { + NativeLoader.loadJni(); + allocator = new RootAllocator(Long.MAX_VALUE); + session = Session.create(); + file = Files.createTempFile("vortex-jni-bench-", ".vortex"); + Files.deleteIfExists(file); + String uri = file.toAbsolutePath().toUri().toString(); + writeTable(session, allocator, uri, WRITE_CHUNK); + dataSource = DataSource.open(session, uri); + } + + @TearDown(Level.Trial) + public void teardown() throws Exception { + // Intentionally does not close the allocator: DataSource/Scan native resources are released by VortexCleaner + // at GC time, which races an explicit allocator.close() and trips leak detection. The JMH fork exits after the + // trial and reclaims everything; we only remove the temp file. + dataSource = null; + if (file != null) { + Files.deleteIfExists(file); + } + } + + private static Schema schema() { + return new Schema(List.of( + Field.notNullable("id", new ArrowType.Int(64, true)), + Field.notNullable("x", new ArrowType.Int(64, true)), + Field.notNullable("y", new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE)), + Field.notNullable("z", new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE)), + Field.nullable("cat", ArrowType.Utf8View.INSTANCE), + Field.nullable("tag", ArrowType.Utf8View.INSTANCE))); + } + + private static void writeTable(Session session, BufferAllocator allocator, String uri, int chunk) throws Exception { + Schema schema = schema(); + Random rnd = new Random(42); + try (VortexWriter writer = VortexWriter.create(session, uri, schema, new HashMap<>(), allocator); + VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator)) { + BigIntVector id = (BigIntVector) root.getVector("id"); + BigIntVector x = (BigIntVector) root.getVector("x"); + Float8Vector y = (Float8Vector) root.getVector("y"); + Float8Vector z = (Float8Vector) root.getVector("z"); + ViewVarCharVector cat = (ViewVarCharVector) root.getVector("cat"); + ViewVarCharVector tag = (ViewVarCharVector) root.getVector("tag"); + + long written = 0; + while (written < ROWS) { + int batch = (int) Math.min(chunk, ROWS - written); + for (FieldVector v : root.getFieldVectors()) { + v.reset(); + } + for (int i = 0; i < batch; i++) { + long r = written + i; + id.setSafe(i, r); + x.setSafe(i, rnd.nextInt(1_000_000)); + y.setSafe(i, rnd.nextDouble()); + z.setSafe(i, rnd.nextDouble()); + cat.setSafe(i, CATS[(int) (r % CATS.length)].getBytes(UTF_8)); + tag.setSafe(i, Long.toString(r).getBytes(UTF_8)); + } + root.setRowCount(batch); + try (ArrowArray arr = ArrowArray.allocateNew(allocator); + ArrowSchema sch = ArrowSchema.allocateNew(allocator)) { + Data.exportVectorSchemaRoot(allocator, root, null, arr, sch); + writer.writeBatch(arr.memoryAddress(), sch.memoryAddress()); + } + written += batch; + } + } + } + + @Benchmark + public void fullScan(Blackhole bh) throws Exception { + long sumId = 0; + long sumX = 0; + double sumY = 0; + long catNonNull = 0; + Scan scan = dataSource.scan(ScanOptions.of()); + while (scan.hasNext()) { + Partition partition = scan.next(); + try (ArrowReader reader = partition.scanArrow(allocator)) { + while (reader.loadNextBatch()) { + VectorSchemaRoot r = reader.getVectorSchemaRoot(); + int rows = r.getRowCount(); + BigIntVector id = (BigIntVector) r.getVector("id"); + BigIntVector x = (BigIntVector) r.getVector("x"); + Float8Vector y = (Float8Vector) r.getVector("y"); + FieldVector cat = r.getVector("cat"); + for (int i = 0; i < rows; i++) { + sumId += id.get(i); + sumX += x.get(i); + sumY += y.get(i); + if (!cat.isNull(i)) { + catNonNull++; + } + } + } + } + } + bh.consume(sumId); + bh.consume(sumX); + bh.consume(sumY); + bh.consume(catNonNull); + } + + @Benchmark + public void projection(Blackhole bh) throws Exception { + Expression projection = Expression.select(new String[] {"id", "y"}, Expression.root()); + ScanOptions options = ScanOptions.builder().projection(projection).build(); + long sumId = 0; + double sumY = 0; + Scan scan = dataSource.scan(options); + while (scan.hasNext()) { + Partition partition = scan.next(); + try (ArrowReader reader = partition.scanArrow(allocator)) { + while (reader.loadNextBatch()) { + VectorSchemaRoot r = reader.getVectorSchemaRoot(); + int rows = r.getRowCount(); + BigIntVector id = (BigIntVector) r.getVector("id"); + Float8Vector y = (Float8Vector) r.getVector("y"); + for (int i = 0; i < rows; i++) { + sumId += id.get(i); + sumY += y.get(i); + } + } + } + } + bh.consume(sumId); + bh.consume(sumY); + } + + @Benchmark + public void selectiveFilter(Blackhole bh) throws Exception { + Expression filter = + Expression.binary(Expression.BinaryOp.EQ, Expression.column("cat"), Expression.literal(CATS[0])); + ScanOptions options = ScanOptions.builder().filter(filter).build(); + long matched = 0; + long sumId = 0; + Scan scan = dataSource.scan(options); + while (scan.hasNext()) { + Partition partition = scan.next(); + try (ArrowReader reader = partition.scanArrow(allocator)) { + while (reader.loadNextBatch()) { + VectorSchemaRoot r = reader.getVectorSchemaRoot(); + int rows = r.getRowCount(); + BigIntVector id = (BigIntVector) r.getVector("id"); + for (int i = 0; i < rows; i++) { + sumId += id.get(i); + matched++; + } + } + } + } + bh.consume(matched); + bh.consume(sumId); + } + + /** + * Diagnostic (not a benchmark): prints the distribution of read batch row counts for a few writer chunk sizes, to + * show that Vortex coalesces to a stable read-batch granularity independent of how the file was written. + */ + public static void main(String[] args) throws Exception { + NativeLoader.loadJni(); + for (int chunk : new int[] {8192, 131072}) { + BufferAllocator alloc = new RootAllocator(Long.MAX_VALUE); + Session sess = Session.create(); + Path f = Files.createTempFile("vortex-jni-diag-" + chunk + "-", ".vortex"); + Files.deleteIfExists(f); + String uri = f.toAbsolutePath().toUri().toString(); + writeTable(sess, alloc, uri, chunk); + DataSource ds = DataSource.open(sess, uri); + long batches = 0; + long rowsSeen = 0; + long minRows = Long.MAX_VALUE; + long maxRows = 0; + Scan scan = ds.scan(ScanOptions.of()); + while (scan.hasNext()) { + Partition partition = scan.next(); + try (ArrowReader reader = partition.scanArrow(alloc)) { + while (reader.loadNextBatch()) { + int rows = reader.getVectorSchemaRoot().getRowCount(); + batches++; + rowsSeen += rows; + minRows = Math.min(minRows, rows); + maxRows = Math.max(maxRows, rows); + } + } + } + System.out.printf( + "writeChunkRows=%d -> %d read batches over %d rows (min=%d, max=%d, avg=%d)%n", + chunk, batches, rowsSeen, minRows, maxRows, batches == 0 ? 0 : rowsSeen / batches); + Files.deleteIfExists(f); + } + } +} From 56f57bebe464efcd36e27094d0b893d81e0625f7 Mon Sep 17 00:00:00 2001 From: mprammer Date: Wed, 24 Jun 2026 00:20:41 -0400 Subject: [PATCH 2/4] =?UTF-8?q?java(vortex-jni-bench):=20address=20gauntle?= =?UTF-8?q?t=20review=20=E2=80=94=20isolate=20pushdown,=20fix=20units,=20a?= =?UTF-8?q?dd=20guards?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit A codex-run gauntlet (fresh/correctness/maint) flagged the first cut as overclaiming relative to what it measured. This commit fixes that: - Isolate native pushdown from JVM-side work: add projectionControl (full scan, consume id,y in Java) and filterControl (full scan, filter cat='alpha' in Java). The pushdown speedup is now projection-vs-projectionControl (~4.1x) and selectiveFilter-vs-filterControl (~4.6x), not the confounded ~6x-vs-fullScan. (M2) - fullScan now consumes all six columns at the buffer level (z, cat, tag added), so the "all-six-column scan" number is honest (~40M rows/s). (M1) - @OperationsPerInvocation(ROWS) so JMH reports input rows/s directly, not scans/s. (M3) - @Setup validates the file before measuring: exact row count, cat='alpha' returns ROWS/|CATS|, projection schema is exactly [id,y] — fast garbage can't be cited. (M5) - Gradle guard fails the jmh task unless VORTEX_SKIP_MAKE_TEST_FILES=true, so a plain run can't silently rebuild + measure the debug lib. (M6) - @Threads(1); tag carries a 10% null rate; README documents the synthetic-data caveats. Read path returns string columns as VarCharVector (Utf8), not ViewVarCharVector — matches the existing TestMinimal read path. Native floor for a boundary-overhead % remains the v2 TODO (M4). Co-Authored-By: Claude Signed-off-by: mprammer --- java/vortex-jni-bench/README.md | 39 ++++- java/vortex-jni-bench/build.gradle.kts | 16 ++ .../vortex/bench/VortexJniReadBenchmark.java | 156 +++++++++++++++--- 3 files changed, 178 insertions(+), 33 deletions(-) diff --git a/java/vortex-jni-bench/README.md b/java/vortex-jni-bench/README.md index ad065bf9b78..0df4a97e71e 100644 --- a/java/vortex-jni-bench/README.md +++ b/java/vortex-jni-bench/README.md @@ -4,22 +4,47 @@ JMH microbenchmarks that stress the **vortex-jni read boundary** — JNI plus th Interface — which is the path an Iceberg `FormatModel` takes to read Vortex from the JVM. `VortexJniReadBenchmark` writes a synthetic six-column table (2M rows: 2× int64, 2× float64, -2× Utf8View) and reads it back three ways, consuming columns at the buffer level (numeric sums / -null counts) so the numbers reflect format + boundary cost, not per-row Java allocation: +2× Utf8View) and reads it back, consuming columns at the buffer level (numeric sums, view lengths, +null counts) so the numbers reflect format + boundary cost, not per-row Java allocation. + +Each invocation scans the full 2M-row table, so `@OperationsPerInvocation(ROWS)` makes JMH report +**input rows scanned per second** directly (not scans/s that you have to convert by hand). + +Benchmarks: - `fullScan` — read all six columns. -- `projection` — read two of six (projection pushdown). -- `selectiveFilter` — `cat = 'alpha'` (~1/16 selectivity; filter pushdown). +- `projection` — native projection of `id,y` (two of six columns). +- `projectionControl` — full scan, but consume only `id,y` in Java (NO native projection). +- `selectiveFilter` — native filter `cat = 'alpha'` (~1/16 selectivity). +- `filterControl` — full scan, evaluate `cat = 'alpha'` in Java (NO native filter). + +The **controls are the point**: `projection` vs `projectionControl` and `selectiveFilter` vs +`filterControl` do the same Java-side work with and without native pushdown, so the remaining +speedup isolates native projection / filter pushdown from "fewer vectors / fewer rows touched in +Java." Comparing a pushdown lane against `fullScan` alone conflates the two and overstates pushdown. + +`@Setup` validates the generated file before any measurement (exact row count, `cat='alpha'` returns +exactly `ROWS/|CATS|`, projection schema is exactly `[id, y]`) and fails the trial otherwise — a +corrupt write or broken filter must not silently produce impressive throughput. `ScanOptions` has no read-batch knob, and Vortex coalesces to ~64K-row read batches regardless of the writer's chunk size, so the boundary is amortized over large batches by construction. Run the -`main` method to see that batch-granularity diagnostic. +`main` method to see the batch-granularity diagnostic across chunk sizes. + +**Workload caveats** (it is synthetic, single-machine, warm-cache — directional, not a leaderboard): +`id` is sequential, `cat` is a periodic 16-value low-cardinality column (kept non-null so filter +selectivity is exactly 1/16), `tag` is high-cardinality with a 10% null rate, numerics use a fixed +seed. This shape is friendly to compression and pushdown; a less-compressible / higher-null workload +is future work. These lanes measure throughput and pushdown *through* the boundary, **not** the +boundary's overhead versus a native floor (that comparison is the v2 TODO below). ## Running The benchmark **must** run against a `--release` native lib (the dev `makeTestFiles` task builds a -debug lib, which would make the numbers meaningless). Build it once and drop it into vortex-jni's -resources, then run with `VORTEX_SKIP_MAKE_TEST_FILES=true` so the debug rebuild doesn't clobber it: +debug lib, which would make the numbers meaningless). The `jmh` task is **guarded** to fail unless +`VORTEX_SKIP_MAKE_TEST_FILES=true`, so a plain `./gradlew :vortex-jni-bench:jmh` cannot silently +rebuild and measure the debug lib. Build the release lib once, drop it into vortex-jni's resources, +then run: ```bash # from repo root: build the release cdylib diff --git a/java/vortex-jni-bench/build.gradle.kts b/java/vortex-jni-bench/build.gradle.kts index febd4c4479e..e84b893db0f 100644 --- a/java/vortex-jni-bench/build.gradle.kts +++ b/java/vortex-jni-bench/build.gradle.kts @@ -27,3 +27,19 @@ dependencies { jmh { jmhVersion.set("1.37") } + +// Guard: the benchmark is meaningless against a debug native lib. Require the deliberate release path +// (VORTEX_SKIP_MAKE_TEST_FILES=true, with a release libvortex_jni placed in vortex-jni's resources) so a +// plain `./gradlew :vortex-jni-bench:jmh` cannot silently rebuild + measure the debug lib. +tasks.named("jmh") { + doFirst { + if (System.getenv("VORTEX_SKIP_MAKE_TEST_FILES") != "true") { + throw GradleException( + "vortex-jni-bench must run against a RELEASE native lib. Build it " + + "(cargo build --release -p vortex-jni), copy it into " + + "vortex-jni/src/main/resources/native/-/, and re-run with " + + "VORTEX_SKIP_MAKE_TEST_FILES=true. See README.md.", + ) + } + } +} diff --git a/java/vortex-jni-bench/src/jmh/java/dev/vortex/bench/VortexJniReadBenchmark.java b/java/vortex-jni-bench/src/jmh/java/dev/vortex/bench/VortexJniReadBenchmark.java index ab3b03ffd7d..ad4123d0745 100644 --- a/java/vortex-jni-bench/src/jmh/java/dev/vortex/bench/VortexJniReadBenchmark.java +++ b/java/vortex-jni-bench/src/jmh/java/dev/vortex/bench/VortexJniReadBenchmark.java @@ -15,6 +15,7 @@ import dev.vortex.jni.NativeLoader; import java.nio.file.Files; import java.nio.file.Path; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Random; @@ -27,6 +28,7 @@ import org.apache.arrow.vector.BigIntVector; import org.apache.arrow.vector.FieldVector; import org.apache.arrow.vector.Float8Vector; +import org.apache.arrow.vector.VarCharVector; import org.apache.arrow.vector.VectorSchemaRoot; import org.apache.arrow.vector.ViewVarCharVector; import org.apache.arrow.vector.ipc.ArrowReader; @@ -40,27 +42,40 @@ import org.openjdk.jmh.annotations.Level; import org.openjdk.jmh.annotations.Measurement; import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OperationsPerInvocation; import org.openjdk.jmh.annotations.OutputTimeUnit; import org.openjdk.jmh.annotations.Scope; import org.openjdk.jmh.annotations.Setup; import org.openjdk.jmh.annotations.State; import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Threads; import org.openjdk.jmh.annotations.Warmup; import org.openjdk.jmh.infra.Blackhole; /** * Measures read throughput through the vortex-jni boundary (JNI + the Arrow C Data Interface). * - *

Three query shapes — full scan, projection, and a selective filter — over a synthetic six-column table. Rows are - * consumed column-at-a-time (numeric sums, null counts) rather than into per-row Java objects, so the numbers reflect - * the format/boundary cost rather than JVM allocation. + *

Every invocation scans the full {@link #ROWS}-row table, so {@code @OperationsPerInvocation(ROWS)} makes JMH + * report input rows scanned per second directly (rather than scans/s that the reader must convert). * - *

Note on batch size: {@code ScanOptions} exposes no read-batch knob, and Vortex coalesces to ~64K-row read batches - * regardless of the writer's chunk size (see {@link #main}), so the boundary cost is already amortized over large - * batches by construction — there is no small-batch regime to sweep from the public API. + *

To isolate native pushdown from JVM-side consumption savings, the projection and filter lanes each have a + * control that does the SAME Java-side work but WITHOUT the native pushdown: + * + *

    + *
  • {@code projection} (native projection of id,y) vs {@code projectionControl} (full scan, consume only id,y) — + * the speedup that remains is attributable to native projection, not to touching fewer Java vectors. + *
  • {@code selectiveFilter} (native filter cat='alpha') vs {@code filterControl} (full scan, filter in Java) — + * the speedup that remains is attributable to native filter pushdown, not to summing fewer rows. + *
+ * + *

Rows are consumed column-at-a-time (numeric sums, view lengths, null counts) rather than into per-row Java + * objects, so the numbers reflect format/boundary cost rather than JVM allocation. {@code ScanOptions} exposes no + * read-batch knob, and Vortex coalesces to ~64K-row read batches regardless of the writer's chunk size (see + * {@link #main}), so boundary cost is amortized over large batches by construction. */ @BenchmarkMode(Mode.Throughput) @OutputTimeUnit(TimeUnit.SECONDS) +@OperationsPerInvocation(VortexJniReadBenchmark.ROWS) @Warmup(iterations = 3, time = 2) @Measurement(iterations = 5, time = 2) @Fork( @@ -69,15 +84,18 @@ "--add-opens=java.base/java.nio=ALL-UNNAMED", "--add-opens=java.base/sun.nio.ch=ALL-UNNAMED" }) +@Threads(1) @State(Scope.Benchmark) public class VortexJniReadBenchmark { - static final long ROWS = 2_000_000L; + static final int ROWS = 2_000_000; static final int WRITE_CHUNK = 65536; static final String[] CATS = { "alpha", "bravo", "charlie", "delta", "echo", "foxtrot", "golf", "hotel", "india", "juliet", "kilo", "lima", "mike", "november", "oscar", "papa" }; + static final byte[] ALPHA = "alpha".getBytes(UTF_8); + static final long EXPECTED_ALPHA_MATCHES = ROWS / CATS.length; BufferAllocator allocator; Session session; @@ -94,6 +112,7 @@ public void setup() throws Exception { String uri = file.toAbsolutePath().toUri().toString(); writeTable(session, allocator, uri, WRITE_CHUNK); dataSource = DataSource.open(session, uri); + validate(); } @TearDown(Level.Trial) @@ -107,6 +126,43 @@ public void teardown() throws Exception { } } + /** Fail the trial loudly if the generated file or pushdown semantics are wrong — fast garbage must not be cited. */ + private void validate() throws Exception { + if (!(dataSource.rowCount() instanceof DataSource.RowCount.Exact exact) || exact.value() != ROWS) { + throw new IllegalStateException("expected exactly " + ROWS + " rows, got " + dataSource.rowCount()); + } + // Native filter must return exactly ROWS/|CATS| rows for cat='alpha'. + Expression filter = + Expression.binary(Expression.BinaryOp.EQ, Expression.column("cat"), Expression.literal(CATS[0])); + long matched = 0; + Scan scan = dataSource.scan(ScanOptions.builder().filter(filter).build()); + while (scan.hasNext()) { + try (ArrowReader reader = scan.next().scanArrow(allocator)) { + while (reader.loadNextBatch()) { + matched += reader.getVectorSchemaRoot().getRowCount(); + } + } + } + if (matched != EXPECTED_ALPHA_MATCHES) { + throw new IllegalStateException("filter cat='alpha' returned " + matched + ", expected " + EXPECTED_ALPHA_MATCHES); + } + // Native projection must yield exactly [id, y]. + Expression projection = Expression.select(new String[] {"id", "y"}, Expression.root()); + Scan pscan = dataSource.scan(ScanOptions.builder().projection(projection).build()); + if (pscan.hasNext()) { + try (ArrowReader reader = pscan.next().scanArrow(allocator)) { + if (reader.loadNextBatch()) { + List names = reader.getVectorSchemaRoot().getSchema().getFields().stream() + .map(Field::getName) + .toList(); + if (!names.equals(List.of("id", "y"))) { + throw new IllegalStateException("projection schema expected [id, y], got " + names); + } + } + } + } + } + private static Schema schema() { return new Schema(List.of( Field.notNullable("id", new ArrowType.Int(64, true)), @@ -141,8 +197,14 @@ private static void writeTable(Session session, BufferAllocator allocator, Strin x.setSafe(i, rnd.nextInt(1_000_000)); y.setSafe(i, rnd.nextDouble()); z.setSafe(i, rnd.nextDouble()); + // cat stays non-null and deterministic so filter selectivity is exactly 1/|CATS|. cat.setSafe(i, CATS[(int) (r % CATS.length)].getBytes(UTF_8)); - tag.setSafe(i, Long.toString(r).getBytes(UTF_8)); + // tag carries nulls (every 10th row) and high-cardinality values to exercise a validity buffer. + if (r % 10 == 0) { + tag.setNull(i); + } else { + tag.setSafe(i, Long.toString(r).getBytes(UTF_8)); + } } root.setRowCount(batch); try (ArrowArray arr = ArrowArray.allocateNew(allocator); @@ -155,29 +217,37 @@ private static void writeTable(Session session, BufferAllocator allocator, Strin } } + /** Full scan consuming ALL six columns at the buffer level. */ @Benchmark public void fullScan(Blackhole bh) throws Exception { long sumId = 0; long sumX = 0; double sumY = 0; - long catNonNull = 0; + double sumZ = 0; + long catLen = 0; + long tagLenOrNulls = 0; Scan scan = dataSource.scan(ScanOptions.of()); while (scan.hasNext()) { - Partition partition = scan.next(); - try (ArrowReader reader = partition.scanArrow(allocator)) { + try (ArrowReader reader = scan.next().scanArrow(allocator)) { while (reader.loadNextBatch()) { VectorSchemaRoot r = reader.getVectorSchemaRoot(); int rows = r.getRowCount(); BigIntVector id = (BigIntVector) r.getVector("id"); BigIntVector x = (BigIntVector) r.getVector("x"); Float8Vector y = (Float8Vector) r.getVector("y"); - FieldVector cat = r.getVector("cat"); + Float8Vector z = (Float8Vector) r.getVector("z"); + VarCharVector cat = (VarCharVector) r.getVector("cat"); + VarCharVector tag = (VarCharVector) r.getVector("tag"); for (int i = 0; i < rows; i++) { sumId += id.get(i); sumX += x.get(i); sumY += y.get(i); - if (!cat.isNull(i)) { - catNonNull++; + sumZ += z.get(i); + catLen += cat.getValueLength(i); + if (tag.isNull(i)) { + tagLenOrNulls++; + } else { + tagLenOrNulls += tag.getValueLength(i); } } } @@ -186,19 +256,29 @@ public void fullScan(Blackhole bh) throws Exception { bh.consume(sumId); bh.consume(sumX); bh.consume(sumY); - bh.consume(catNonNull); + bh.consume(sumZ); + bh.consume(catLen); + bh.consume(tagLenOrNulls); } + /** Native projection pushdown: only id,y cross the boundary. */ @Benchmark public void projection(Blackhole bh) throws Exception { Expression projection = Expression.select(new String[] {"id", "y"}, Expression.root()); - ScanOptions options = ScanOptions.builder().projection(projection).build(); + bh.consume(consumeIdY(dataSource.scan(ScanOptions.builder().projection(projection).build()))); + } + + /** Control for {@link #projection}: full scan, but consume only id,y in Java (no native projection). */ + @Benchmark + public void projectionControl(Blackhole bh) throws Exception { + bh.consume(consumeIdY(dataSource.scan(ScanOptions.of()))); + } + + private double consumeIdY(Scan scan) throws Exception { long sumId = 0; double sumY = 0; - Scan scan = dataSource.scan(options); while (scan.hasNext()) { - Partition partition = scan.next(); - try (ArrowReader reader = partition.scanArrow(allocator)) { + try (ArrowReader reader = scan.next().scanArrow(allocator)) { while (reader.loadNextBatch()) { VectorSchemaRoot r = reader.getVectorSchemaRoot(); int rows = r.getRowCount(); @@ -211,21 +291,19 @@ public void projection(Blackhole bh) throws Exception { } } } - bh.consume(sumId); - bh.consume(sumY); + return sumId + sumY; } + /** Native filter pushdown: only matching rows cross the boundary. */ @Benchmark public void selectiveFilter(Blackhole bh) throws Exception { Expression filter = Expression.binary(Expression.BinaryOp.EQ, Expression.column("cat"), Expression.literal(CATS[0])); - ScanOptions options = ScanOptions.builder().filter(filter).build(); long matched = 0; long sumId = 0; - Scan scan = dataSource.scan(options); + Scan scan = dataSource.scan(ScanOptions.builder().filter(filter).build()); while (scan.hasNext()) { - Partition partition = scan.next(); - try (ArrowReader reader = partition.scanArrow(allocator)) { + try (ArrowReader reader = scan.next().scanArrow(allocator)) { while (reader.loadNextBatch()) { VectorSchemaRoot r = reader.getVectorSchemaRoot(); int rows = r.getRowCount(); @@ -241,13 +319,39 @@ public void selectiveFilter(Blackhole bh) throws Exception { bh.consume(sumId); } + /** Control for {@link #selectiveFilter}: full scan, evaluate cat='alpha' in Java (no native filter). */ + @Benchmark + public void filterControl(Blackhole bh) throws Exception { + long matched = 0; + long sumId = 0; + Scan scan = dataSource.scan(ScanOptions.of()); + while (scan.hasNext()) { + try (ArrowReader reader = scan.next().scanArrow(allocator)) { + while (reader.loadNextBatch()) { + VectorSchemaRoot r = reader.getVectorSchemaRoot(); + int rows = r.getRowCount(); + BigIntVector id = (BigIntVector) r.getVector("id"); + VarCharVector cat = (VarCharVector) r.getVector("cat"); + for (int i = 0; i < rows; i++) { + if (!cat.isNull(i) && Arrays.equals(cat.get(i), ALPHA)) { + sumId += id.get(i); + matched++; + } + } + } + } + } + bh.consume(matched); + bh.consume(sumId); + } + /** * Diagnostic (not a benchmark): prints the distribution of read batch row counts for a few writer chunk sizes, to * show that Vortex coalesces to a stable read-batch granularity independent of how the file was written. */ public static void main(String[] args) throws Exception { NativeLoader.loadJni(); - for (int chunk : new int[] {8192, 131072}) { + for (int chunk : new int[] {8192, 65536, 131072}) { BufferAllocator alloc = new RootAllocator(Long.MAX_VALUE); Session sess = Session.create(); Path f = Files.createTempFile("vortex-jni-diag-" + chunk + "-", ".vortex"); From 98615fa5630c3197765d5e3b614773c068f202ae Mon Sep 17 00:00:00 2001 From: Robert Kruszewski Date: Thu, 25 Jun 2026 13:59:35 +0100 Subject: [PATCH 3/4] more Signed-off-by: Robert Kruszewski --- Cargo.lock | 1 + java/settings.gradle.kts | 5 +- java/vortex-jni-bench/README.md | 75 ---- java/vortex-jni-bench/build.gradle.kts | 45 -- .../vortex/bench/VortexJniReadBenchmark.java | 385 ------------------ java/vortex-jni/BENCHMARKS.md | 110 +++++ java/vortex-jni/build.gradle.kts | 143 ++++++- .../jmh/java/dev/vortex/bench/BenchData.java | 99 +++++ .../bench/VortexJniBatchDiagnostic.java | 62 +++ .../vortex/bench/VortexJniReadBenchmark.java | 203 +++++++++ vortex-jni/Cargo.toml | 9 +- vortex-jni/benches/canonical/mod.rs | 136 +++++++ vortex-jni/benches/read_boundary.rs | 227 +++++++++++ vortex-jni/examples/gen_bench_data.rs | 42 ++ 14 files changed, 1031 insertions(+), 511 deletions(-) delete mode 100644 java/vortex-jni-bench/README.md delete mode 100644 java/vortex-jni-bench/build.gradle.kts delete mode 100644 java/vortex-jni-bench/src/jmh/java/dev/vortex/bench/VortexJniReadBenchmark.java create mode 100644 java/vortex-jni/BENCHMARKS.md create mode 100644 java/vortex-jni/src/jmh/java/dev/vortex/bench/BenchData.java create mode 100644 java/vortex-jni/src/jmh/java/dev/vortex/bench/VortexJniBatchDiagnostic.java create mode 100644 java/vortex-jni/src/jmh/java/dev/vortex/bench/VortexJniReadBenchmark.java create mode 100644 vortex-jni/benches/canonical/mod.rs create mode 100644 vortex-jni/benches/read_boundary.rs create mode 100644 vortex-jni/examples/gen_bench_data.rs diff --git a/Cargo.lock b/Cargo.lock index 66457601e06..b3030452e6e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10148,6 +10148,7 @@ dependencies = [ "arrow-array", "arrow-schema", "async-fs", + "codspeed-divan-compat", "futures", "jni", "object_store", diff --git a/java/settings.gradle.kts b/java/settings.gradle.kts index 8cbf97feb04..a71a1b9fd77 100644 --- a/java/settings.gradle.kts +++ b/java/settings.gradle.kts @@ -17,11 +17,10 @@ toolchainManagement { rootProject.name = "vortex-root" -// API bindings +// API bindings (JMH benchmarks live in vortex-jni's `jmh` source set; see vortex-jni/BENCHMARKS.md) include("vortex-jni") -// Benchmarks -include("vortex-jni-bench") +// Spark integration include("vortex-spark_2.12") project(":vortex-spark_2.12").projectDir = file("vortex-spark") diff --git a/java/vortex-jni-bench/README.md b/java/vortex-jni-bench/README.md deleted file mode 100644 index 0df4a97e71e..00000000000 --- a/java/vortex-jni-bench/README.md +++ /dev/null @@ -1,75 +0,0 @@ -# vortex-jni-bench - -JMH microbenchmarks that stress the **vortex-jni read boundary** — JNI plus the Arrow C Data -Interface — which is the path an Iceberg `FormatModel` takes to read Vortex from the JVM. - -`VortexJniReadBenchmark` writes a synthetic six-column table (2M rows: 2× int64, 2× float64, -2× Utf8View) and reads it back, consuming columns at the buffer level (numeric sums, view lengths, -null counts) so the numbers reflect format + boundary cost, not per-row Java allocation. - -Each invocation scans the full 2M-row table, so `@OperationsPerInvocation(ROWS)` makes JMH report -**input rows scanned per second** directly (not scans/s that you have to convert by hand). - -Benchmarks: - -- `fullScan` — read all six columns. -- `projection` — native projection of `id,y` (two of six columns). -- `projectionControl` — full scan, but consume only `id,y` in Java (NO native projection). -- `selectiveFilter` — native filter `cat = 'alpha'` (~1/16 selectivity). -- `filterControl` — full scan, evaluate `cat = 'alpha'` in Java (NO native filter). - -The **controls are the point**: `projection` vs `projectionControl` and `selectiveFilter` vs -`filterControl` do the same Java-side work with and without native pushdown, so the remaining -speedup isolates native projection / filter pushdown from "fewer vectors / fewer rows touched in -Java." Comparing a pushdown lane against `fullScan` alone conflates the two and overstates pushdown. - -`@Setup` validates the generated file before any measurement (exact row count, `cat='alpha'` returns -exactly `ROWS/|CATS|`, projection schema is exactly `[id, y]`) and fails the trial otherwise — a -corrupt write or broken filter must not silently produce impressive throughput. - -`ScanOptions` has no read-batch knob, and Vortex coalesces to ~64K-row read batches regardless of -the writer's chunk size, so the boundary is amortized over large batches by construction. Run the -`main` method to see the batch-granularity diagnostic across chunk sizes. - -**Workload caveats** (it is synthetic, single-machine, warm-cache — directional, not a leaderboard): -`id` is sequential, `cat` is a periodic 16-value low-cardinality column (kept non-null so filter -selectivity is exactly 1/16), `tag` is high-cardinality with a 10% null rate, numerics use a fixed -seed. This shape is friendly to compression and pushdown; a less-compressible / higher-null workload -is future work. These lanes measure throughput and pushdown *through* the boundary, **not** the -boundary's overhead versus a native floor (that comparison is the v2 TODO below). - -## Running - -The benchmark **must** run against a `--release` native lib (the dev `makeTestFiles` task builds a -debug lib, which would make the numbers meaningless). The `jmh` task is **guarded** to fail unless -`VORTEX_SKIP_MAKE_TEST_FILES=true`, so a plain `./gradlew :vortex-jni-bench:jmh` cannot silently -rebuild and measure the debug lib. Build the release lib once, drop it into vortex-jni's resources, -then run: - -```bash -# from repo root: build the release cdylib -cargo build --release -p vortex-jni -# place it for the host arch (example: macOS arm64) -cp target/release/libvortex_jni.dylib \ - java/vortex-jni/src/main/resources/native/darwin-aarch64/ - -cd java -VORTEX_SKIP_MAKE_TEST_FILES=true ./gradlew :vortex-jni-bench:jmh -``` - -Results land in `vortex-jni-bench/build/results/jmh/results.txt`. The JMH fork adds Arrow's -`--add-opens` flags via `@Fork(jvmArgsAppend=...)`. - -Batch-granularity diagnostic: - -```bash -VORTEX_SKIP_MAKE_TEST_FILES=true ./gradlew :vortex-jni-bench:jmhJar -java --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED \ - -cp vortex-jni-bench/build/libs/*-jmh.jar dev.vortex.bench.VortexJniReadBenchmark -``` - -## TODO (v2) - -These benchmarks measure absolute throughput and pushdown effectiveness *through* the boundary, not -the boundary's *overhead*. To quote a "().configureEach { enabled = false } - -dependencies { - jmhImplementation(platform(libs.netty.bom)) - jmhImplementation(project(":vortex-jni")) - jmhImplementation(libs.arrow.c.data) - jmhImplementation(libs.arrow.memory.core) - jmhImplementation(libs.arrow.memory.netty) -} - -jmh { - jmhVersion.set("1.37") -} - -// Guard: the benchmark is meaningless against a debug native lib. Require the deliberate release path -// (VORTEX_SKIP_MAKE_TEST_FILES=true, with a release libvortex_jni placed in vortex-jni's resources) so a -// plain `./gradlew :vortex-jni-bench:jmh` cannot silently rebuild + measure the debug lib. -tasks.named("jmh") { - doFirst { - if (System.getenv("VORTEX_SKIP_MAKE_TEST_FILES") != "true") { - throw GradleException( - "vortex-jni-bench must run against a RELEASE native lib. Build it " + - "(cargo build --release -p vortex-jni), copy it into " + - "vortex-jni/src/main/resources/native/-/, and re-run with " + - "VORTEX_SKIP_MAKE_TEST_FILES=true. See README.md.", - ) - } - } -} diff --git a/java/vortex-jni-bench/src/jmh/java/dev/vortex/bench/VortexJniReadBenchmark.java b/java/vortex-jni-bench/src/jmh/java/dev/vortex/bench/VortexJniReadBenchmark.java deleted file mode 100644 index ad4123d0745..00000000000 --- a/java/vortex-jni-bench/src/jmh/java/dev/vortex/bench/VortexJniReadBenchmark.java +++ /dev/null @@ -1,385 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 -// SPDX-FileCopyrightText: Copyright the Vortex contributors - -package dev.vortex.bench; - -import static java.nio.charset.StandardCharsets.UTF_8; - -import dev.vortex.api.DataSource; -import dev.vortex.api.Expression; -import dev.vortex.api.Partition; -import dev.vortex.api.Scan; -import dev.vortex.api.ScanOptions; -import dev.vortex.api.Session; -import dev.vortex.api.VortexWriter; -import dev.vortex.jni.NativeLoader; -import java.nio.file.Files; -import java.nio.file.Path; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Random; -import java.util.concurrent.TimeUnit; -import org.apache.arrow.c.ArrowArray; -import org.apache.arrow.c.ArrowSchema; -import org.apache.arrow.c.Data; -import org.apache.arrow.memory.BufferAllocator; -import org.apache.arrow.memory.RootAllocator; -import org.apache.arrow.vector.BigIntVector; -import org.apache.arrow.vector.FieldVector; -import org.apache.arrow.vector.Float8Vector; -import org.apache.arrow.vector.VarCharVector; -import org.apache.arrow.vector.VectorSchemaRoot; -import org.apache.arrow.vector.ViewVarCharVector; -import org.apache.arrow.vector.ipc.ArrowReader; -import org.apache.arrow.vector.types.FloatingPointPrecision; -import org.apache.arrow.vector.types.pojo.ArrowType; -import org.apache.arrow.vector.types.pojo.Field; -import org.apache.arrow.vector.types.pojo.Schema; -import org.openjdk.jmh.annotations.Benchmark; -import org.openjdk.jmh.annotations.BenchmarkMode; -import org.openjdk.jmh.annotations.Fork; -import org.openjdk.jmh.annotations.Level; -import org.openjdk.jmh.annotations.Measurement; -import org.openjdk.jmh.annotations.Mode; -import org.openjdk.jmh.annotations.OperationsPerInvocation; -import org.openjdk.jmh.annotations.OutputTimeUnit; -import org.openjdk.jmh.annotations.Scope; -import org.openjdk.jmh.annotations.Setup; -import org.openjdk.jmh.annotations.State; -import org.openjdk.jmh.annotations.TearDown; -import org.openjdk.jmh.annotations.Threads; -import org.openjdk.jmh.annotations.Warmup; -import org.openjdk.jmh.infra.Blackhole; - -/** - * Measures read throughput through the vortex-jni boundary (JNI + the Arrow C Data Interface). - * - *

Every invocation scans the full {@link #ROWS}-row table, so {@code @OperationsPerInvocation(ROWS)} makes JMH - * report input rows scanned per second directly (rather than scans/s that the reader must convert). - * - *

To isolate native pushdown from JVM-side consumption savings, the projection and filter lanes each have a - * control that does the SAME Java-side work but WITHOUT the native pushdown: - * - *

    - *
  • {@code projection} (native projection of id,y) vs {@code projectionControl} (full scan, consume only id,y) — - * the speedup that remains is attributable to native projection, not to touching fewer Java vectors. - *
  • {@code selectiveFilter} (native filter cat='alpha') vs {@code filterControl} (full scan, filter in Java) — - * the speedup that remains is attributable to native filter pushdown, not to summing fewer rows. - *
- * - *

Rows are consumed column-at-a-time (numeric sums, view lengths, null counts) rather than into per-row Java - * objects, so the numbers reflect format/boundary cost rather than JVM allocation. {@code ScanOptions} exposes no - * read-batch knob, and Vortex coalesces to ~64K-row read batches regardless of the writer's chunk size (see - * {@link #main}), so boundary cost is amortized over large batches by construction. - */ -@BenchmarkMode(Mode.Throughput) -@OutputTimeUnit(TimeUnit.SECONDS) -@OperationsPerInvocation(VortexJniReadBenchmark.ROWS) -@Warmup(iterations = 3, time = 2) -@Measurement(iterations = 5, time = 2) -@Fork( - value = 1, - jvmArgsAppend = { - "--add-opens=java.base/java.nio=ALL-UNNAMED", - "--add-opens=java.base/sun.nio.ch=ALL-UNNAMED" - }) -@Threads(1) -@State(Scope.Benchmark) -public class VortexJniReadBenchmark { - - static final int ROWS = 2_000_000; - static final int WRITE_CHUNK = 65536; - static final String[] CATS = { - "alpha", "bravo", "charlie", "delta", "echo", "foxtrot", "golf", "hotel", - "india", "juliet", "kilo", "lima", "mike", "november", "oscar", "papa" - }; - static final byte[] ALPHA = "alpha".getBytes(UTF_8); - static final long EXPECTED_ALPHA_MATCHES = ROWS / CATS.length; - - BufferAllocator allocator; - Session session; - DataSource dataSource; - Path file; - - @Setup(Level.Trial) - public void setup() throws Exception { - NativeLoader.loadJni(); - allocator = new RootAllocator(Long.MAX_VALUE); - session = Session.create(); - file = Files.createTempFile("vortex-jni-bench-", ".vortex"); - Files.deleteIfExists(file); - String uri = file.toAbsolutePath().toUri().toString(); - writeTable(session, allocator, uri, WRITE_CHUNK); - dataSource = DataSource.open(session, uri); - validate(); - } - - @TearDown(Level.Trial) - public void teardown() throws Exception { - // Intentionally does not close the allocator: DataSource/Scan native resources are released by VortexCleaner - // at GC time, which races an explicit allocator.close() and trips leak detection. The JMH fork exits after the - // trial and reclaims everything; we only remove the temp file. - dataSource = null; - if (file != null) { - Files.deleteIfExists(file); - } - } - - /** Fail the trial loudly if the generated file or pushdown semantics are wrong — fast garbage must not be cited. */ - private void validate() throws Exception { - if (!(dataSource.rowCount() instanceof DataSource.RowCount.Exact exact) || exact.value() != ROWS) { - throw new IllegalStateException("expected exactly " + ROWS + " rows, got " + dataSource.rowCount()); - } - // Native filter must return exactly ROWS/|CATS| rows for cat='alpha'. - Expression filter = - Expression.binary(Expression.BinaryOp.EQ, Expression.column("cat"), Expression.literal(CATS[0])); - long matched = 0; - Scan scan = dataSource.scan(ScanOptions.builder().filter(filter).build()); - while (scan.hasNext()) { - try (ArrowReader reader = scan.next().scanArrow(allocator)) { - while (reader.loadNextBatch()) { - matched += reader.getVectorSchemaRoot().getRowCount(); - } - } - } - if (matched != EXPECTED_ALPHA_MATCHES) { - throw new IllegalStateException("filter cat='alpha' returned " + matched + ", expected " + EXPECTED_ALPHA_MATCHES); - } - // Native projection must yield exactly [id, y]. - Expression projection = Expression.select(new String[] {"id", "y"}, Expression.root()); - Scan pscan = dataSource.scan(ScanOptions.builder().projection(projection).build()); - if (pscan.hasNext()) { - try (ArrowReader reader = pscan.next().scanArrow(allocator)) { - if (reader.loadNextBatch()) { - List names = reader.getVectorSchemaRoot().getSchema().getFields().stream() - .map(Field::getName) - .toList(); - if (!names.equals(List.of("id", "y"))) { - throw new IllegalStateException("projection schema expected [id, y], got " + names); - } - } - } - } - } - - private static Schema schema() { - return new Schema(List.of( - Field.notNullable("id", new ArrowType.Int(64, true)), - Field.notNullable("x", new ArrowType.Int(64, true)), - Field.notNullable("y", new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE)), - Field.notNullable("z", new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE)), - Field.nullable("cat", ArrowType.Utf8View.INSTANCE), - Field.nullable("tag", ArrowType.Utf8View.INSTANCE))); - } - - private static void writeTable(Session session, BufferAllocator allocator, String uri, int chunk) throws Exception { - Schema schema = schema(); - Random rnd = new Random(42); - try (VortexWriter writer = VortexWriter.create(session, uri, schema, new HashMap<>(), allocator); - VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator)) { - BigIntVector id = (BigIntVector) root.getVector("id"); - BigIntVector x = (BigIntVector) root.getVector("x"); - Float8Vector y = (Float8Vector) root.getVector("y"); - Float8Vector z = (Float8Vector) root.getVector("z"); - ViewVarCharVector cat = (ViewVarCharVector) root.getVector("cat"); - ViewVarCharVector tag = (ViewVarCharVector) root.getVector("tag"); - - long written = 0; - while (written < ROWS) { - int batch = (int) Math.min(chunk, ROWS - written); - for (FieldVector v : root.getFieldVectors()) { - v.reset(); - } - for (int i = 0; i < batch; i++) { - long r = written + i; - id.setSafe(i, r); - x.setSafe(i, rnd.nextInt(1_000_000)); - y.setSafe(i, rnd.nextDouble()); - z.setSafe(i, rnd.nextDouble()); - // cat stays non-null and deterministic so filter selectivity is exactly 1/|CATS|. - cat.setSafe(i, CATS[(int) (r % CATS.length)].getBytes(UTF_8)); - // tag carries nulls (every 10th row) and high-cardinality values to exercise a validity buffer. - if (r % 10 == 0) { - tag.setNull(i); - } else { - tag.setSafe(i, Long.toString(r).getBytes(UTF_8)); - } - } - root.setRowCount(batch); - try (ArrowArray arr = ArrowArray.allocateNew(allocator); - ArrowSchema sch = ArrowSchema.allocateNew(allocator)) { - Data.exportVectorSchemaRoot(allocator, root, null, arr, sch); - writer.writeBatch(arr.memoryAddress(), sch.memoryAddress()); - } - written += batch; - } - } - } - - /** Full scan consuming ALL six columns at the buffer level. */ - @Benchmark - public void fullScan(Blackhole bh) throws Exception { - long sumId = 0; - long sumX = 0; - double sumY = 0; - double sumZ = 0; - long catLen = 0; - long tagLenOrNulls = 0; - Scan scan = dataSource.scan(ScanOptions.of()); - while (scan.hasNext()) { - try (ArrowReader reader = scan.next().scanArrow(allocator)) { - while (reader.loadNextBatch()) { - VectorSchemaRoot r = reader.getVectorSchemaRoot(); - int rows = r.getRowCount(); - BigIntVector id = (BigIntVector) r.getVector("id"); - BigIntVector x = (BigIntVector) r.getVector("x"); - Float8Vector y = (Float8Vector) r.getVector("y"); - Float8Vector z = (Float8Vector) r.getVector("z"); - VarCharVector cat = (VarCharVector) r.getVector("cat"); - VarCharVector tag = (VarCharVector) r.getVector("tag"); - for (int i = 0; i < rows; i++) { - sumId += id.get(i); - sumX += x.get(i); - sumY += y.get(i); - sumZ += z.get(i); - catLen += cat.getValueLength(i); - if (tag.isNull(i)) { - tagLenOrNulls++; - } else { - tagLenOrNulls += tag.getValueLength(i); - } - } - } - } - } - bh.consume(sumId); - bh.consume(sumX); - bh.consume(sumY); - bh.consume(sumZ); - bh.consume(catLen); - bh.consume(tagLenOrNulls); - } - - /** Native projection pushdown: only id,y cross the boundary. */ - @Benchmark - public void projection(Blackhole bh) throws Exception { - Expression projection = Expression.select(new String[] {"id", "y"}, Expression.root()); - bh.consume(consumeIdY(dataSource.scan(ScanOptions.builder().projection(projection).build()))); - } - - /** Control for {@link #projection}: full scan, but consume only id,y in Java (no native projection). */ - @Benchmark - public void projectionControl(Blackhole bh) throws Exception { - bh.consume(consumeIdY(dataSource.scan(ScanOptions.of()))); - } - - private double consumeIdY(Scan scan) throws Exception { - long sumId = 0; - double sumY = 0; - while (scan.hasNext()) { - try (ArrowReader reader = scan.next().scanArrow(allocator)) { - while (reader.loadNextBatch()) { - VectorSchemaRoot r = reader.getVectorSchemaRoot(); - int rows = r.getRowCount(); - BigIntVector id = (BigIntVector) r.getVector("id"); - Float8Vector y = (Float8Vector) r.getVector("y"); - for (int i = 0; i < rows; i++) { - sumId += id.get(i); - sumY += y.get(i); - } - } - } - } - return sumId + sumY; - } - - /** Native filter pushdown: only matching rows cross the boundary. */ - @Benchmark - public void selectiveFilter(Blackhole bh) throws Exception { - Expression filter = - Expression.binary(Expression.BinaryOp.EQ, Expression.column("cat"), Expression.literal(CATS[0])); - long matched = 0; - long sumId = 0; - Scan scan = dataSource.scan(ScanOptions.builder().filter(filter).build()); - while (scan.hasNext()) { - try (ArrowReader reader = scan.next().scanArrow(allocator)) { - while (reader.loadNextBatch()) { - VectorSchemaRoot r = reader.getVectorSchemaRoot(); - int rows = r.getRowCount(); - BigIntVector id = (BigIntVector) r.getVector("id"); - for (int i = 0; i < rows; i++) { - sumId += id.get(i); - matched++; - } - } - } - } - bh.consume(matched); - bh.consume(sumId); - } - - /** Control for {@link #selectiveFilter}: full scan, evaluate cat='alpha' in Java (no native filter). */ - @Benchmark - public void filterControl(Blackhole bh) throws Exception { - long matched = 0; - long sumId = 0; - Scan scan = dataSource.scan(ScanOptions.of()); - while (scan.hasNext()) { - try (ArrowReader reader = scan.next().scanArrow(allocator)) { - while (reader.loadNextBatch()) { - VectorSchemaRoot r = reader.getVectorSchemaRoot(); - int rows = r.getRowCount(); - BigIntVector id = (BigIntVector) r.getVector("id"); - VarCharVector cat = (VarCharVector) r.getVector("cat"); - for (int i = 0; i < rows; i++) { - if (!cat.isNull(i) && Arrays.equals(cat.get(i), ALPHA)) { - sumId += id.get(i); - matched++; - } - } - } - } - } - bh.consume(matched); - bh.consume(sumId); - } - - /** - * Diagnostic (not a benchmark): prints the distribution of read batch row counts for a few writer chunk sizes, to - * show that Vortex coalesces to a stable read-batch granularity independent of how the file was written. - */ - public static void main(String[] args) throws Exception { - NativeLoader.loadJni(); - for (int chunk : new int[] {8192, 65536, 131072}) { - BufferAllocator alloc = new RootAllocator(Long.MAX_VALUE); - Session sess = Session.create(); - Path f = Files.createTempFile("vortex-jni-diag-" + chunk + "-", ".vortex"); - Files.deleteIfExists(f); - String uri = f.toAbsolutePath().toUri().toString(); - writeTable(sess, alloc, uri, chunk); - DataSource ds = DataSource.open(sess, uri); - long batches = 0; - long rowsSeen = 0; - long minRows = Long.MAX_VALUE; - long maxRows = 0; - Scan scan = ds.scan(ScanOptions.of()); - while (scan.hasNext()) { - Partition partition = scan.next(); - try (ArrowReader reader = partition.scanArrow(alloc)) { - while (reader.loadNextBatch()) { - int rows = reader.getVectorSchemaRoot().getRowCount(); - batches++; - rowsSeen += rows; - minRows = Math.min(minRows, rows); - maxRows = Math.max(maxRows, rows); - } - } - } - System.out.printf( - "writeChunkRows=%d -> %d read batches over %d rows (min=%d, max=%d, avg=%d)%n", - chunk, batches, rowsSeen, minRows, maxRows, batches == 0 ? 0 : rowsSeen / batches); - Files.deleteIfExists(f); - } - } -} diff --git a/java/vortex-jni/BENCHMARKS.md b/java/vortex-jni/BENCHMARKS.md new file mode 100644 index 00000000000..8899642780d --- /dev/null +++ b/java/vortex-jni/BENCHMARKS.md @@ -0,0 +1,110 @@ +# vortex-jni read-boundary benchmarks + +Two benchmarks stress the **vortex-jni read boundary** — reading a Vortex file with column projection +and filter pushdown — from two sides that read the **same canonical file**: + +- **`VortexJniReadBenchmark`** (JMH, Java) — reads *through* JNI + the Arrow C Data Interface, the + path an Iceberg `FormatModel` takes to read Vortex from the JVM. Lives in this module's `jmh` + source set (`src/jmh/java/dev/vortex/bench/`). +- **`read_boundary`** (Divan, Rust) — the **native floor**: the same `scan → Arrow` work entirely in + Rust, with no JNI crossing and no Arrow C Data export. Lives in the `vortex-jni` *crate* + (`vortex-jni/benches/read_boundary.rs`). + +Because both read the exact same bytes, run the same lanes, and report **input rows scanned per +second**, the numbers are directly comparable: the gap between them is the cost of the JNI + Arrow C +Data boundary on top of the underlying format read. + +## The lanes + +- `fullScan` / `full_scan` — read all six columns. +- `projection` — native projection of `id,y` (two of six columns). +- `selectiveFilter` / `selective_filter` — native filter `cat = 'alpha'` (~1/16 selectivity). + +`projection` vs `fullScan` isolates native **projection** pushdown (same rows, fewer columns +materialized); `selectiveFilter` vs `fullScan` isolates native **filter** pushdown (same scan, fewer +rows produced). + +**Single-threaded vs pooled**: each lane runs in two threading modes. The single-threaded mode drives +the scan on the consuming thread only (the JNI default). The **pooled** mode adds a background worker +pool sized to `available_parallelism() - 1`, so the scan's split tasks run in parallel while the +consuming thread drains results. The Rust bench runs the Vortex→Arrow conversion inside the scan's +`map` (on those handle-spawned split tasks), so the pool parallelizes both the decode and the Arrow +conversion — column-heavy `full_scan` speeds up several-fold; lanes whose Arrow output is tiny +(`projection`, `selective_filter`) are unaffected. The Rust pooled lanes are `*_pooled` +(`full_scan_pooled`, …) backed by a `CurrentThreadWorkerPool`; the JMH side is parameterized by +`workerThreads` (`0` = single-threaded, `-1` = available parallelism) via `NativeRuntime`. + +Each lane scans the full table and **materializes every result batch to Arrow, then sums the batch +row counts** (`getRowCount()` / `RecordBatch::len()`) — no per-value work. So the numbers reflect +scan + materialization + (for JMH) the boundary, not consume-side arithmetic. `@OperationsPerInvocation(ROWS)` +(JMH) and `ItemsCount::new(ROWS)` (Divan) both make this **rows scanned/s**. Vortex coalesces to +~64K-row read batches regardless of the writer's chunk size (the `VortexJniBatchDiagnostic` tool +prints this), so boundary cost is amortized over large batches by construction. + +The JMH `@Setup` validates the file before any measurement (exact row count, `cat='alpha'` returns +exactly `ROWS/|CATS|`, projection schema is exactly `[id, y]`) and fails the trial otherwise — a +corrupt file or broken filter must not silently produce impressive throughput. + +**View types**: both benches downgrade Utf8View → Utf8 (and BinaryView → Binary) when materializing +to Arrow — the JMH path because the production `scanArrow` does so for Spark and other view-less +consumers, and the Rust bench via a matching stripped target field — so the two materialize identical +Arrow types. (With row-count consumption neither actually touches the string columns.) + +## The shared canonical file + +Both benches read one canonical `.vortex` file at `target/vortex-jni-bench/data.vortex`, written by +the **Rust generator** (`vortex-jni/benches/canonical/mod.rs`, exposed as the `gen_bench_data` +example). It is a deterministic, six-column, 2M-row table (2× int64, 2× float64, 2× Utf8View) built +from a fixed formula — no RNG — so it is byte-reproducible and the two benches measure identical data. +Generation is idempotent: an existing file is reused, so whichever side runs first writes it and the +other reads it. Delete the file to regenerate. + +## Running + +**Java (JMH), through the boundary** — the `jmh` task wires everything up: + +```bash +cd java +./gradlew :vortex-jni:jmh +``` + +It (a) builds a release-optimized native lib via `buildJmhNativeLib` +(`cargo build --profile release_debug -p vortex-jni`) and stages it on the runtime classpath — the +dev `makeTestFiles` debug build, which would make numbers meaningless, is skipped while the benchmark +is in the task graph; (b) generates the canonical file via `generateBenchFile`; and (c) passes the +Arrow `--add-opens` flags and `-Dvortex.jni.bench.file=` into the forked JVM. Results land in +`vortex-jni/build/results/jmh/results.txt`. + +**Rust (Divan), the native floor**: + +```bash +# from repo root +cargo bench -p vortex-jni --bench read_boundary +``` + +The bench generates the canonical file on first run if absent. (`release_debug` is the `release` +profile plus full debug info, good for profiling; it lives in `target/release_debug/`, separate from +`target/debug` and `target/release`.) + +**Batch-granularity diagnostic** — `VortexJniBatchDiagnostic`, a standalone tool (not a JMH +benchmark) that prints read-batch row counts per writer chunk size: + +```bash +cd java +./gradlew :vortex-jni:batchDiagnostic +``` + +## Comparing the two + +Line up the JMH `ops/s` against the Divan `rows/s` lane-for-lane. The Rust floor is faster on every +lane; the ratio is the JNI + Arrow C Data boundary overhead for that access pattern (it is largest +where the most data crosses the boundary, e.g. `fullScan`, and smallest where pushdown means little +crosses it, e.g. `projection`). Both are **synthetic, single-machine, warm-cache — directional, not a +leaderboard**: `id` is sequential, `cat` is a periodic 16-value low-cardinality column (non-null, so +filter selectivity is exactly 1/16), `tag` is high-cardinality with a 10% null rate. This shape is +friendly to compression and pushdown. + +## Future work + +- A less-compressible / higher-null workload (the current shape favors compression and pushdown). +- A wider/narrower-row and a multi-file variant. diff --git a/java/vortex-jni/build.gradle.kts b/java/vortex-jni/build.gradle.kts index bb652d0d86e..7ce45755fa1 100644 --- a/java/vortex-jni/build.gradle.kts +++ b/java/vortex-jni/build.gradle.kts @@ -2,12 +2,14 @@ // SPDX-FileCopyrightText: Copyright the Vortex contributors import com.github.jengelman.gradle.plugins.shadow.tasks.ShadowJar +import net.ltgt.gradle.errorprone.errorprone import org.gradle.kotlin.dsl.support.serviceOf plugins { `java-library` `jvm-test-suite` id("com.gradleup.shadow") version "9.4.2" + id("me.champeau.jmh") version "0.7.3" } dependencies { @@ -92,8 +94,9 @@ tasks.withType().all { ) } -// shade guava and arrow dependencies -tasks.withType { +// shade guava and arrow dependencies in the published jar only. The JMH benchmark links the real +// (unrelocated) Arrow classes, so its jar must not be relocated — scope this to the `shadowJar` task. +tasks.named("shadowJar") { relocate("com.google.common", "dev.vortex.relocated.com.google.common") relocate("org.apache.arrow", "dev.vortex.relocated.org.apache.arrow") { // exclude C Data Interface since JNI cannot be relocated @@ -214,4 +217,140 @@ tasks.register("generateJniHeaders") { dependsOn("compileJava") } +// --------------------------------------------------------------------------- +// JMH benchmarks (src/jmh). See BENCHMARKS.md. +// +// The read-boundary benchmark is meaningless against a debug native lib, so the `jmh` task builds +// and stages the release_debug cdylib itself (buildJmhNativeLib) rather than reusing the dev +// `makeTestFiles` debug build. The benchmark links the real Arrow classes off the runtime classpath +// (it is not run from the relocated shadowJar), so no relocation applies to it. +// --------------------------------------------------------------------------- +// Shared canonical benchmark file, generated by the Rust side and read by BOTH the JMH benchmark and +// the Rust `read_boundary` Divan bench so the two measure reads of the exact same bytes. +val workspaceRoot = rootProject.projectDir.absoluteFile.parentFile +val benchFile = workspaceRoot.resolve("target/vortex-jni-bench/data.vortex") + +jmh { + jmhVersion.set("1.37") + // These reach the forked benchmark JVM. The Arrow C Data Interface needs the --add-opens; the + // system property points the benchmark at the shared canonical file. + jvmArgsAppend.addAll( + "--add-opens=java.base/java.nio=ALL-UNNAMED", + "--add-opens=java.base/sun.nio.ch=ALL-UNNAMED", + "-Dvortex.jni.bench.file=${benchFile.absolutePath}", + ) +} + +// Generate the shared canonical .vortex file via the Rust generator example. Idempotent: skipped +// while the file exists (delete it to regenerate). Both `jmh` and the Rust bench read this file. +val generateBenchFile = + tasks.register("generateBenchFile") { + description = "Generate the shared canonical .vortex file read by the JMH and Rust read benchmarks" + group = "verification" + + outputs.file(benchFile) + + doLast { + benchFile.parentFile.mkdirs() + serviceOf().exec { + workingDir = workspaceRoot + executable = "cargo" + args( + "run", + "--profile", + "release_debug", + "--quiet", + "--package", + "vortex-jni", + "--example", + "gen_bench_data", + "--", + benchFile.absolutePath, + ) + } + } + } + +// JMH benchmark classes/methods must be public and non-final, which the nopen checker forbids, and +// the generated JMH glue trips error-prone under -Werror. Relax both for the jmh source set only; +// main and test keep full strictness. +tasks.withType().configureEach { + if (name.lowercase().contains("jmh")) { + options.errorprone.enabled.set(false) + options.compilerArgs.remove("-Werror") + } +} + +// Skip the redundant debug `makeTestFiles` build when this invocation runs the benchmark; the +// benchmark consumes the release_debug lib staged by buildJmhNativeLib instead. +val benchmarkRequested = objects.property(Boolean::class.java).convention(false) +gradle.taskGraph.whenReady { + benchmarkRequested.set(allTasks.any { it.project == project && (it.name == "jmh" || it.name == "jmhJar") }) +} +tasks.named("makeTestFiles").configure { + onlyIf { !benchmarkRequested.get() } +} + +val buildJmhNativeLib = + tasks.register("buildJmhNativeLib") { + description = "Build the release_debug vortex-jni cdylib and stage it for the JMH benchmark" + group = "verification" + + // Stage on top of the processed resources so the benchmark loads it from the runtime classpath. + dependsOn("processResources") + + doLast { + val workspaceRoot = rootProject.projectDir.absoluteFile.parentFile + + serviceOf().exec { + workingDir = workspaceRoot + executable = "cargo" + args("build", "--profile", "release_debug", "--package", "vortex-jni") + } + + val osName = System.getProperty("os.name").lowercase() + val osArch = System.getProperty("os.arch").lowercase() + val osShortName = + when { + osName.contains("mac") -> "darwin" + osName.contains("nix") || osName.contains("nux") -> "linux" + osName.contains("win") -> "win" + else -> throw GradleException("Unsupported OS for buildJmhNativeLib: $osName") + } + val libExt = + when (osShortName) { + "darwin" -> ".dylib" + "linux" -> ".so" + "win" -> ".dll" + else -> throw GradleException("Unsupported OS short name: $osShortName") + } + + copy { + from("$workspaceRoot/target/release_debug/libvortex_jni$libExt") + into(layout.buildDirectory.dir("resources/main/native/$osShortName-$osArch")) + } + } + } + +tasks.named("jmh").configure { + dependsOn(buildJmhNativeLib) + dependsOn(generateBenchFile) +} +tasks.named("jmhJar").configure { dependsOn(buildJmhNativeLib) } + +// Standalone read-batch-granularity diagnostic (VortexJniBatchDiagnostic, not a JMH benchmark). Run +// it off the jmh runtime classpath, which carries the real Arrow classes and the staged +// release_debug lib (me.champeau.jmh's fat `jmhJar` does not bundle deps under com.gradleup.shadow). +tasks.register("batchDiagnostic") { + description = "Run the standalone read-batch-granularity diagnostic (VortexJniBatchDiagnostic)" + group = "verification" + dependsOn("buildJmhNativeLib") + classpath = sourceSets["jmh"].runtimeClasspath + mainClass.set("dev.vortex.bench.VortexJniBatchDiagnostic") + jvmArgs( + "--add-opens=java.base/java.nio=ALL-UNNAMED", + "--add-opens=java.base/sun.nio.ch=ALL-UNNAMED", + ) +} + description = "JNI bindings for the Vortex format" diff --git a/java/vortex-jni/src/jmh/java/dev/vortex/bench/BenchData.java b/java/vortex-jni/src/jmh/java/dev/vortex/bench/BenchData.java new file mode 100644 index 00000000000..0b0fec24e82 --- /dev/null +++ b/java/vortex-jni/src/jmh/java/dev/vortex/bench/BenchData.java @@ -0,0 +1,99 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +package dev.vortex.bench; + +import static java.nio.charset.StandardCharsets.UTF_8; + +import dev.vortex.api.Session; +import dev.vortex.api.VortexWriter; +import java.util.HashMap; +import java.util.List; +import java.util.Random; +import org.apache.arrow.c.ArrowArray; +import org.apache.arrow.c.ArrowSchema; +import org.apache.arrow.c.Data; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.vector.BigIntVector; +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.Float8Vector; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.ViewVarCharVector; +import org.apache.arrow.vector.types.FloatingPointPrecision; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.Schema; + +/** + * Shared synthetic table used by both {@link VortexJniReadBenchmark} and {@link VortexJniBatchDiagnostic} so they + * measure and inspect the exact same data shape: six columns (2× int64, 2× float64, 2× Utf8View) over {@link #ROWS} + * rows, with a deterministic fixed seed. + * + *

{@code id} is sequential, {@code cat} is a periodic low-cardinality column kept non-null so a {@code cat='alpha'} + * filter has selectivity exactly {@code 1/|CATS|}, and {@code tag} is high-cardinality with a 10% null rate to exercise + * a validity buffer. + */ +final class BenchData { + + static final int ROWS = 2_000_000; + static final String[] CATS = { + "alpha", "bravo", "charlie", "delta", "echo", "foxtrot", "golf", "hotel", + "india", "juliet", "kilo", "lima", "mike", "november", "oscar", "papa" + }; + + private BenchData() {} + + static Schema schema() { + return new Schema(List.of( + Field.notNullable("id", new ArrowType.Int(64, true)), + Field.notNullable("x", new ArrowType.Int(64, true)), + Field.notNullable("y", new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE)), + Field.notNullable("z", new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE)), + Field.nullable("cat", ArrowType.Utf8View.INSTANCE), + Field.nullable("tag", ArrowType.Utf8View.INSTANCE))); + } + + static void writeTable(Session session, BufferAllocator allocator, String uri, int chunk) throws Exception { + Schema schema = schema(); + Random rnd = new Random(42); + try (VortexWriter writer = VortexWriter.create(session, uri, schema, new HashMap<>(), allocator); + VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator)) { + BigIntVector id = (BigIntVector) root.getVector("id"); + BigIntVector x = (BigIntVector) root.getVector("x"); + Float8Vector y = (Float8Vector) root.getVector("y"); + Float8Vector z = (Float8Vector) root.getVector("z"); + ViewVarCharVector cat = (ViewVarCharVector) root.getVector("cat"); + ViewVarCharVector tag = (ViewVarCharVector) root.getVector("tag"); + + long written = 0; + while (written < ROWS) { + int batch = (int) Math.min(chunk, ROWS - written); + for (FieldVector v : root.getFieldVectors()) { + v.reset(); + } + for (int i = 0; i < batch; i++) { + long r = written + i; + id.setSafe(i, r); + x.setSafe(i, rnd.nextInt(1_000_000)); + y.setSafe(i, rnd.nextDouble()); + z.setSafe(i, rnd.nextDouble()); + // cat stays non-null and deterministic so filter selectivity is exactly 1/|CATS|. + cat.setSafe(i, CATS[(int) (r % CATS.length)].getBytes(UTF_8)); + // tag carries nulls (every 10th row) and high-cardinality values to exercise a validity buffer. + if (r % 10 == 0) { + tag.setNull(i); + } else { + tag.setSafe(i, Long.toString(r).getBytes(UTF_8)); + } + } + root.setRowCount(batch); + try (ArrowArray arr = ArrowArray.allocateNew(allocator); + ArrowSchema sch = ArrowSchema.allocateNew(allocator)) { + Data.exportVectorSchemaRoot(allocator, root, null, arr, sch); + writer.writeBatch(arr.memoryAddress(), sch.memoryAddress()); + } + written += batch; + } + } + } +} diff --git a/java/vortex-jni/src/jmh/java/dev/vortex/bench/VortexJniBatchDiagnostic.java b/java/vortex-jni/src/jmh/java/dev/vortex/bench/VortexJniBatchDiagnostic.java new file mode 100644 index 00000000000..1b1658de678 --- /dev/null +++ b/java/vortex-jni/src/jmh/java/dev/vortex/bench/VortexJniBatchDiagnostic.java @@ -0,0 +1,62 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +package dev.vortex.bench; + +import dev.vortex.api.DataSource; +import dev.vortex.api.Partition; +import dev.vortex.api.Scan; +import dev.vortex.api.ScanOptions; +import dev.vortex.api.Session; +import dev.vortex.jni.NativeLoader; +import java.nio.file.Files; +import java.nio.file.Path; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.ipc.ArrowReader; + +/** + * Standalone diagnostic (not a JMH benchmark): writes the shared {@link BenchData} table at several writer chunk sizes + * and prints the resulting read-batch row-count distribution, showing that Vortex coalesces to a stable read-batch + * granularity (~64K rows) independent of how the file was written. + * + *

Run it with {@code ./gradlew :vortex-jni:batchDiagnostic}. + */ +public final class VortexJniBatchDiagnostic { + + private VortexJniBatchDiagnostic() {} + + public static void main(String[] args) throws Exception { + NativeLoader.loadJni(); + for (int chunk : new int[] {8192, 65536, 131072}) { + BufferAllocator alloc = new RootAllocator(Long.MAX_VALUE); + Session sess = Session.create(); + Path f = Files.createTempFile("vortex-jni-diag-" + chunk + "-", ".vortex"); + Files.deleteIfExists(f); + String uri = f.toAbsolutePath().toUri().toString(); + BenchData.writeTable(sess, alloc, uri, chunk); + DataSource ds = DataSource.open(sess, uri); + long batches = 0; + long rowsSeen = 0; + long minRows = Long.MAX_VALUE; + long maxRows = 0; + Scan scan = ds.scan(ScanOptions.of()); + while (scan.hasNext()) { + Partition partition = scan.next(); + try (ArrowReader reader = partition.scanArrow(alloc)) { + while (reader.loadNextBatch()) { + int rows = reader.getVectorSchemaRoot().getRowCount(); + batches++; + rowsSeen += rows; + minRows = Math.min(minRows, rows); + maxRows = Math.max(maxRows, rows); + } + } + } + System.out.printf( + "writeChunkRows=%d -> %d read batches over %d rows (min=%d, max=%d, avg=%d)%n", + chunk, batches, rowsSeen, minRows, maxRows, batches == 0 ? 0 : rowsSeen / batches); + Files.deleteIfExists(f); + } + } +} diff --git a/java/vortex-jni/src/jmh/java/dev/vortex/bench/VortexJniReadBenchmark.java b/java/vortex-jni/src/jmh/java/dev/vortex/bench/VortexJniReadBenchmark.java new file mode 100644 index 00000000000..577cc257104 --- /dev/null +++ b/java/vortex-jni/src/jmh/java/dev/vortex/bench/VortexJniReadBenchmark.java @@ -0,0 +1,203 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +package dev.vortex.bench; + +import dev.vortex.api.DataSource; +import dev.vortex.api.Expression; +import dev.vortex.api.Scan; +import dev.vortex.api.ScanOptions; +import dev.vortex.api.Session; +import dev.vortex.jni.NativeLoader; +import dev.vortex.jni.NativeRuntime; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.List; +import java.util.concurrent.TimeUnit; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.ipc.ArrowReader; +import org.apache.arrow.vector.types.pojo.Field; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OperationsPerInvocation; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +/** + * Measures read throughput through the vortex-jni boundary (JNI + the Arrow C Data Interface). + * + *

Every invocation scans the full {@link BenchData#ROWS}-row table, so {@code @OperationsPerInvocation(ROWS)} makes + * JMH report input rows scanned per second directly. Each lane materializes the result batches and sums their + * {@code getRowCount()} — no per-value work — so the numbers reflect scan + boundary cost rather than JVM-side + * arithmetic. Vortex coalesces to ~64K-row read batches regardless of the writer's chunk size (see + * {@link VortexJniBatchDiagnostic}), so boundary cost is amortized over large batches by construction. + * + *

    + *
  • {@code fullScan} — read all six columns. + *
  • {@code projection} — native projection of {@code id, y} (two of six columns). + *
  • {@code selectiveFilter} — native filter {@code cat = 'alpha'} (~1/16 selectivity). + *
+ * + *

This reads a shared canonical file generated by the Rust side (see {@code BENCHMARKS.md} and the + * {@code :vortex-jni:generateBenchFile} Gradle task), so these JMH {@code ops/s} are directly comparable to the Rust + * {@code read_boundary} Divan bench reading the same bytes — the difference is the JNI + Arrow C Data boundary cost. + * The Gradle {@code jmh} task passes the file path via the {@code -Dvortex.jni.bench.file} system property; running the + * benchmark by hand requires setting it (or {@code VORTEX_JNI_BENCH_FILE}). + */ +@BenchmarkMode(Mode.Throughput) +@OutputTimeUnit(TimeUnit.SECONDS) +@OperationsPerInvocation(BenchData.ROWS) +@Warmup(iterations = 3, time = 2) +@Measurement(iterations = 5, time = 2) +@Fork(1) +@Threads(1) +@State(Scope.Benchmark) +public class VortexJniReadBenchmark { + + /** System property (set by the Gradle {@code jmh} task) pointing at the shared canonical {@code .vortex} file. */ + static final String BENCH_FILE_PROPERTY = "vortex.jni.bench.file"; + + static final long EXPECTED_ALPHA_MATCHES = BenchData.ROWS / BenchData.CATS.length; + + /** + * Background worker threads driving the JVM-wide pool: {@code 0} keeps the reading thread the only driver + * (single-threaded), {@code -1} sizes the pool to the available parallelism so the scan's split tasks decode in + * parallel in the background. + */ + @Param({"0", "-1"}) + public int workerThreads; + + BufferAllocator allocator; + Session session; + DataSource dataSource; + + @Setup(Level.Trial) + public void setup() throws Exception { + NativeLoader.loadJni(); + if (workerThreads < 0) { + NativeRuntime.setWorkerThreadsToAvailableParallelism(); + } else { + NativeRuntime.setWorkerThreads(workerThreads); + } + allocator = new RootAllocator(Long.MAX_VALUE); + session = Session.create(); + String uri = benchFile().toUri().toString(); + dataSource = DataSource.open(session, uri); + validate(); + } + + /** Locate the shared canonical file (generated by {@code :vortex-jni:generateBenchFile}); fail loudly if absent. */ + private static Path benchFile() { + String configured = System.getProperty(BENCH_FILE_PROPERTY); + if (configured == null || configured.isBlank()) { + configured = System.getenv("VORTEX_JNI_BENCH_FILE"); + } + if (configured == null || configured.isBlank()) { + throw new IllegalStateException("canonical bench file not configured: set -D" + BENCH_FILE_PROPERTY + + " (the Gradle `jmh` task does this). Run `./gradlew :vortex-jni:jmh`."); + } + Path path = Path.of(configured).toAbsolutePath(); + if (!Files.isRegularFile(path)) { + throw new IllegalStateException("canonical bench file does not exist: " + path + + " — run `./gradlew :vortex-jni:generateBenchFile`."); + } + return path; + } + + @TearDown(Level.Trial) + public void teardown() { + // The canonical file is shared with the Rust bench and managed by Gradle, so it is not deleted here. + // Intentionally does not close the allocator either: DataSource/Scan native resources are released by + // VortexCleaner at GC time, which races an explicit allocator.close() and trips leak detection. The JMH fork + // exits after the trial and reclaims everything. + dataSource = null; + } + + /** Fail the trial loudly if the file or pushdown semantics are wrong — fast garbage must not be cited. */ + private void validate() throws Exception { + if (!(dataSource.rowCount() instanceof DataSource.RowCount.Exact exact) || exact.value() != BenchData.ROWS) { + throw new IllegalStateException( + "expected exactly " + BenchData.ROWS + " rows, got " + dataSource.rowCount()); + } + // Native filter must return exactly ROWS/|CATS| rows for cat='alpha'. + Expression filter = Expression.binary( + Expression.BinaryOp.EQ, Expression.column("cat"), Expression.literal(BenchData.CATS[0])); + long matched = 0; + Scan scan = dataSource.scan(ScanOptions.builder().filter(filter).build()); + while (scan.hasNext()) { + try (ArrowReader reader = scan.next().scanArrow(allocator)) { + while (reader.loadNextBatch()) { + matched += reader.getVectorSchemaRoot().getRowCount(); + } + } + } + if (matched != EXPECTED_ALPHA_MATCHES) { + throw new IllegalStateException( + "filter cat='alpha' returned " + matched + ", expected " + EXPECTED_ALPHA_MATCHES); + } + // Native projection must yield exactly [id, y]. + Expression projection = Expression.select(new String[] {"id", "y"}, Expression.root()); + Scan pscan = + dataSource.scan(ScanOptions.builder().projection(projection).build()); + if (pscan.hasNext()) { + try (ArrowReader reader = pscan.next().scanArrow(allocator)) { + if (reader.loadNextBatch()) { + List names = reader.getVectorSchemaRoot().getSchema().getFields().stream() + .map(Field::getName) + .toList(); + if (!names.equals(List.of("id", "y"))) { + throw new IllegalStateException("projection schema expected [id, y], got " + names); + } + } + } + } + } + + /** Full scan of all six columns. */ + @Benchmark + public void fullScan(Blackhole bh) throws Exception { + bh.consume(countRows(dataSource.scan(ScanOptions.of()))); + } + + /** Native projection pushdown: only id,y cross the boundary. */ + @Benchmark + public void projection(Blackhole bh) throws Exception { + Expression projection = Expression.select(new String[] {"id", "y"}, Expression.root()); + bh.consume(countRows( + dataSource.scan(ScanOptions.builder().projection(projection).build()))); + } + + /** Native filter pushdown: only matching rows cross the boundary. */ + @Benchmark + public void selectiveFilter(Blackhole bh) throws Exception { + Expression filter = Expression.binary( + Expression.BinaryOp.EQ, Expression.column("cat"), Expression.literal(BenchData.CATS[0])); + bh.consume( + countRows(dataSource.scan(ScanOptions.builder().filter(filter).build()))); + } + + /** Materialize every result batch and return the total row count — no per-value consumption. */ + private long countRows(Scan scan) throws Exception { + long rows = 0; + while (scan.hasNext()) { + try (ArrowReader reader = scan.next().scanArrow(allocator)) { + while (reader.loadNextBatch()) { + rows += reader.getVectorSchemaRoot().getRowCount(); + } + } + } + return rows; + } +} diff --git a/vortex-jni/Cargo.toml b/vortex-jni/Cargo.toml index d5f5cea61aa..046d5c017fa 100644 --- a/vortex-jni/Cargo.toml +++ b/vortex-jni/Cargo.toml @@ -33,10 +33,17 @@ vortex = { workspace = true, features = ["object_store", "files"] } vortex-parquet-variant = { workspace = true } [dev-dependencies] +divan = { workspace = true } jni = { workspace = true, features = ["invocation"] } [lib] -crate-type = ["staticlib", "cdylib"] +# rlib is added so the read-boundary bench and the canonical-file generator example can link the +# crate; staticlib/cdylib remain for the JNI native library. +crate-type = ["staticlib", "cdylib", "rlib"] + +[[bench]] +name = "read_boundary" +harness = false [lints] workspace = true diff --git a/vortex-jni/benches/canonical/mod.rs b/vortex-jni/benches/canonical/mod.rs new file mode 100644 index 00000000000..9e48cb58ee8 --- /dev/null +++ b/vortex-jni/benches/canonical/mod.rs @@ -0,0 +1,136 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! Deterministic generator for the canonical read-boundary benchmark file. +//! +//! Both the `read_boundary` Divan bench and the `gen_bench_data` example write/read the SAME file +//! so the Rust "native floor" and the Java JMH benchmark (`VortexJniReadBenchmark`) measure reads of +//! the exact same bytes. The Gradle `generateBenchFile` task runs the example to produce it, then +//! points the JMH fork at it via `-Dvortex.jni.bench.file`. +//! +//! The table is six columns over [`ROWS`] rows — `id`, `x` (int64), `y`, `z` (float64), and `cat`, +//! `tag` (Utf8View) — generated by a fixed formula (no RNG) so the file is reproducible: `id` is +//! sequential, `cat` is a periodic 16-value low-cardinality column kept non-null so `cat = 'alpha'` +//! has selectivity exactly `1/|CATS|`, and `tag` is high-cardinality with a 10% null rate. + +use std::path::Path; +use std::path::PathBuf; + +use vortex::VortexSessionDefault; +use vortex::array::ArrayRef; +use vortex::array::IntoArray; +use vortex::array::arrays::ChunkedArray; +use vortex::array::arrays::PrimitiveArray; +use vortex::array::arrays::StructArray; +use vortex::array::arrays::VarBinViewArray; +use vortex::array::validity::Validity; +use vortex::dtype::FieldNames; +use vortex::error::VortexResult; +use vortex::error::vortex_err; +use vortex::file::WriteOptionsSessionExt; +use vortex::io::runtime::BlockingRuntime; +use vortex::io::runtime::current::CurrentThreadRuntime; +use vortex::io::session::RuntimeSessionExt; +use vortex::session::VortexSession; + +/// Rows in the canonical table. Must match the JMH side's `BenchData.ROWS`. +pub const ROWS: usize = 2_000_000; +/// Rows per Arrow batch handed to the writer. +pub const WRITE_CHUNK: usize = 65_536; +/// 16 low-cardinality category values; `cat = 'alpha'` matches exactly `ROWS / 16` rows. +pub const CATS: [&str; 16] = [ + "alpha", "bravo", "charlie", "delta", "echo", "foxtrot", "golf", "hotel", "india", "juliet", + "kilo", "lima", "mike", "november", "oscar", "papa", +]; + +/// Absolute path of the canonical file, shared with the Gradle/JMH side. +/// +/// Honors `VORTEX_JNI_BENCH_FILE`; otherwise defaults to `/target/vortex-jni-bench/ +/// data.vortex` (the same location the Gradle task passes), so a standalone `cargo bench` and a +/// `./gradlew :vortex-jni:jmh` run resolve to the same file. +pub fn default_path() -> PathBuf { + if let Ok(path) = std::env::var("VORTEX_JNI_BENCH_FILE") { + return PathBuf::from(path); + } + Path::new(env!("CARGO_MANIFEST_DIR")).join("../target/vortex-jni-bench/data.vortex") +} + +/// Build one `[start, end)` slice of the canonical table as a Vortex struct array. +fn build_chunk(start: usize, end: usize) -> ArrayRef { + let id = PrimitiveArray::from_iter((start..end).map(|r| r as i64)).into_array(); + // Deterministic spread across [0, 1_000_000); a fixed multiplicative hash, no RNG. + let xs = PrimitiveArray::from_iter( + (start..end).map(|r| (r as i64).wrapping_mul(2_654_435_761).rem_euclid(1_000_000)), + ) + .into_array(); + let ys = + PrimitiveArray::from_iter((start..end).map(|r| (r % 1000) as f64 / 1000.0)).into_array(); + let zs = PrimitiveArray::from_iter((start..end).map(|r| ((r / 1000) % 1000) as f64 / 1000.0)) + .into_array(); + // cat stays non-null and periodic so filter selectivity is exactly 1/|CATS|. + let cat = + VarBinViewArray::from_iter_str((start..end).map(|r| CATS[r % CATS.len()])).into_array(); + // tag carries nulls (every 10th row) and high-cardinality values to exercise a validity buffer. + let tag = VarBinViewArray::from_iter_nullable_str((start..end).map(|r| { + if r % 10 == 0 { + None + } else { + Some(r.to_string()) + } + })) + .into_array(); + + StructArray::new( + FieldNames::from(["id", "x", "y", "z", "cat", "tag"]), + vec![id, xs, ys, zs, cat, tag], + end - start, + Validity::NonNullable, + ) + .into_array() +} + +/// Build the canonical six-column table as a chunked array of [`WRITE_CHUNK`]-row chunks. +fn build_table() -> VortexResult { + let mut chunks = Vec::new(); + let mut start = 0; + while start < ROWS { + let end = (start + WRITE_CHUNK).min(ROWS); + chunks.push(build_chunk(start, end)); + start = end; + } + let dtype = chunks[0].dtype().clone(); + Ok(ChunkedArray::try_new(chunks, dtype)?.into_array()) +} + +/// Write the canonical file to `path`, creating parent directories as needed. +pub fn write_canonical(path: &Path) -> VortexResult<()> { + let runtime = CurrentThreadRuntime::new(); + let session = VortexSession::default().with_handle(runtime.handle()); + let table = build_table()?; + + let mut bytes: Vec = Vec::new(); + runtime.block_on(async { + session + .write_options() + .write(&mut bytes, table.to_array_stream()) + .await + })?; + + if let Some(parent) = path.parent() { + std::fs::create_dir_all(parent) + .map_err(|e| vortex_err!("failed to create {}: {e}", parent.display()))?; + } + std::fs::write(path, &bytes) + .map_err(|e| vortex_err!("failed to write {}: {e}", path.display()))?; + Ok(()) +} + +/// Write the canonical file only if it does not already exist (idempotent), so the bench and the +/// Gradle generator share one file rather than racing to overwrite it. +pub fn ensure_canonical(path: &Path) -> VortexResult<()> { + let present = path.metadata().map(|m| m.len() > 0).unwrap_or(false); + if present { + return Ok(()); + } + write_canonical(path) +} diff --git a/vortex-jni/benches/read_boundary.rs b/vortex-jni/benches/read_boundary.rs new file mode 100644 index 00000000000..ea18a63b689 --- /dev/null +++ b/vortex-jni/benches/read_boundary.rs @@ -0,0 +1,227 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! Native "floor" for the `VortexJniReadBenchmark` JMH lanes. +//! +//! This reads the SAME canonical `.vortex` file the JMH benchmark reads (see [`canonical`]) and runs +//! the same lanes — full scan, native projection, and native filter — but entirely in Rust: +//! `scan -> Arrow RecordBatch`, with no JNI crossing and no Arrow C Data export. Comparing these +//! numbers against the JMH `ops/s` isolates the cost of the JNI + Arrow C Data boundary from the +//! underlying format read. +//! +//! Like the JMH side, every lane scans the full table and reports the produced row count, so +//! `ItemsCount::new(ROWS)` makes Divan print **input rows scanned per second** — directly comparable +//! to JMH's `@OperationsPerInvocation(ROWS)` `ops/s`. Each chunk is materialized to Arrow (forcing +//! the decode) and the batch row count is taken; no per-value work is done, so the numbers reflect +//! scan + materialization, not consume-side arithmetic. +//! +//! Each lane runs in two modes: single-threaded (no pool workers — the consuming thread drives the +//! scan, mirroring the JNI default) and `*_pooled` (a background `CurrentThreadWorkerPool` sized to +//! `available_parallelism() - 1`). The Vortex -> Arrow conversion runs inside the scan's `map`, which +//! executes on the handle-spawned split tasks, so the pool parallelizes both the decode and the Arrow +//! conversion. Utf8View columns are downgraded to flat Arrow `Utf8` via a stripped target field, +//! exactly as the JNI path does, so both benches materialize the same types. + +#![expect(clippy::unwrap_used)] + +mod canonical; + +use std::sync::Arc; +use std::sync::LazyLock; + +use arrow_array::Array; +use arrow_schema::DataType; +use arrow_schema::Field; +use divan::Bencher; +use divan::counter::ItemsCount; +use vortex::VortexSessionDefault; +use vortex::array::VortexSessionExecute; +use vortex::array::arrow::ArrowSessionExt; +use vortex::dtype::DType; +use vortex::dtype::FieldName; +use vortex::error::VortexResult; +use vortex::error::vortex_err; +use vortex::expr::Expression; +use vortex::expr::get_item; +use vortex::expr::lit; +use vortex::expr::root; +use vortex::expr::select; +use vortex::file::OpenOptionsSessionExt; +use vortex::file::VortexFile; +use vortex::io::runtime::BlockingRuntime; +use vortex::io::runtime::current::CurrentThreadRuntime; +use vortex::io::runtime::current::CurrentThreadWorkerPool; +use vortex::io::session::RuntimeSessionExt; +use vortex::scalar_fn::ScalarFnVTableExt; +use vortex::scalar_fn::fns::binary::Binary; +use vortex::scalar_fn::fns::operators::Operator; +use vortex::session::VortexSession; +use vortex::utils::parallelism::get_available_parallelism; + +use crate::canonical::ROWS; + +/// Shared current-thread runtime and its background worker pool, mirroring the JNI's static +/// `RUNTIME`/`POOL`. One long-lived pool is used (it has no `Drop`, so a per-sample pool would leak +/// worker threads); `set_workers` adjusts the count per lane — `0` for the single-threaded lanes, +/// `available_parallelism() - 1` for the pooled ones. +static RUNTIME: LazyLock = LazyLock::new(CurrentThreadRuntime::new); +static POOL: LazyLock = LazyLock::new(|| RUNTIME.new_pool()); + +fn main() { + divan::main(); +} + +/// The lanes, mirroring `VortexJniReadBenchmark`. +#[derive(Clone, Copy)] +enum Lane { + /// Read all six columns. + FullScan, + /// Native projection of `id, y`. + Projection, + /// Native filter `cat = 'alpha'` (~1/16 selectivity). + SelectiveFilter, +} + +/// Read state opened once per Divan sample (outside the timed region) and reused across iterations. +struct Env { + session: VortexSession, + file: VortexFile, +} + +impl Env { + fn open() -> VortexResult { + let path = canonical::default_path(); + canonical::ensure_canonical(&path)?; + + let session = VortexSession::default().with_handle(RUNTIME.handle()); + let file = RUNTIME.block_on( + session + .open_options() + .with_layout_reader_cache() + .open_path(&path), + )?; + Ok(Self { session, file }) + } + + /// Scan the table for `lane`, materialize each chunk to Arrow, and return the total row count so + /// the read is observable without per-value work. + /// + /// The Vortex -> Arrow conversion runs inside the scan's `map`, which executes within each split + /// task spawned on the session's runtime handle — so with pool workers configured the decode AND + /// the Arrow conversion run in parallel in the background, rather than inline on this thread. + fn run(&self, lane: Lane) -> VortexResult { + let mut builder = self.file.scan()?; + match lane { + Lane::Projection => builder = builder.with_projection(project_id_y()), + Lane::SelectiveFilter => builder = builder.with_filter(filter_cat_alpha()), + Lane::FullScan => {} + } + + // Downgrade Utf8View -> Utf8 (and BinaryView -> Binary), matching the JNI Arrow boundary. + let target = stripped_target(&builder.dtype()?)?; + let session = self.session.clone(); + + let mut rows = 0u64; + for batch in builder + .map(move |array| { + let mut ctx = session.create_execution_ctx(); + let arrow = session + .arrow() + .execute_arrow(array, Some(&target), &mut ctx)?; + Ok(arrow.len() as u64) + }) + .into_iter(&*RUNTIME)? + { + rows += batch?; + } + Ok(rows) + } +} + +/// Build the Arrow target field for `execute_arrow`, replacing view string/binary types with their +/// flat equivalents so the materialized types match what the JNI path hands to Java. +fn stripped_target(dtype: &DType) -> VortexResult { + let schema = dtype.to_arrow_schema()?; + let stripped = strip_views(DataType::Struct(schema.fields().clone())); + let DataType::Struct(fields) = stripped else { + return Err(vortex_err!("scan dtype did not export as an Arrow struct")); + }; + Ok(Field::new_struct("", fields, false)) +} + +fn strip_views(data_type: DataType) -> DataType { + match data_type { + DataType::Utf8View => DataType::Utf8, + DataType::BinaryView => DataType::Binary, + DataType::Struct(fields) => DataType::Struct( + fields + .iter() + .map(|f| { + Arc::new(Field::new( + f.name(), + strip_views(f.data_type().clone()), + f.is_nullable(), + )) + }) + .collect(), + ), + other => other, + } +} + +fn project_id_y() -> Expression { + select(vec![FieldName::from("id"), FieldName::from("y")], root()) +} + +fn filter_cat_alpha() -> Expression { + Binary.new_expr( + Operator::Eq, + [get_item(FieldName::from("cat"), root()), lit("alpha")], + ) +} + +/// Background worker count for the pooled lanes: one fewer than the available parallelism (the +/// consuming thread also drives the executor), at least one. +fn worker_threads() -> usize { + get_available_parallelism() + .map(|n| n.saturating_sub(1).max(1)) + .unwrap_or(1) +} + +fn run_lane(bencher: Bencher<'_, '_>, lane: Lane, workers: usize) { + POOL.set_workers(workers); + bencher + .with_inputs(|| Env::open().unwrap()) + .input_counter(|_| ItemsCount::new(ROWS)) + .bench_refs(move |env| env.run(lane).unwrap()); +} + +#[divan::bench] +fn full_scan(bencher: Bencher) { + run_lane(bencher, Lane::FullScan, 0); +} + +#[divan::bench] +fn projection(bencher: Bencher) { + run_lane(bencher, Lane::Projection, 0); +} + +#[divan::bench] +fn selective_filter(bencher: Bencher) { + run_lane(bencher, Lane::SelectiveFilter, 0); +} + +#[divan::bench] +fn full_scan_pooled(bencher: Bencher) { + run_lane(bencher, Lane::FullScan, worker_threads()); +} + +#[divan::bench] +fn projection_pooled(bencher: Bencher) { + run_lane(bencher, Lane::Projection, worker_threads()); +} + +#[divan::bench] +fn selective_filter_pooled(bencher: Bencher) { + run_lane(bencher, Lane::SelectiveFilter, worker_threads()); +} diff --git a/vortex-jni/examples/gen_bench_data.rs b/vortex-jni/examples/gen_bench_data.rs new file mode 100644 index 00000000000..615aaf11e2f --- /dev/null +++ b/vortex-jni/examples/gen_bench_data.rs @@ -0,0 +1,42 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! Generates the canonical read-boundary benchmark file shared by the Rust `read_boundary` bench +//! and the Java `VortexJniReadBenchmark` JMH benchmark. +//! +//! Run directly (`cargo run -p vortex-jni --example gen_bench_data -- [PATH]`) or via the Gradle +//! `generateBenchFile` task. With no argument it writes to [`canonical::default_path`]. Generation +//! is idempotent: an existing non-empty file is left untouched so both benches read the same bytes. + +#[path = "../benches/canonical/mod.rs"] +mod canonical; + +use std::path::PathBuf; +use std::process::ExitCode; + +fn main() -> ExitCode { + let path: PathBuf = std::env::args() + .nth(1) + .map(PathBuf::from) + .unwrap_or_else(canonical::default_path); + + let existed = path.metadata().map(|m| m.len() > 0).unwrap_or(false); + match canonical::ensure_canonical(&path) { + Ok(()) => { + if existed { + println!("canonical bench file already present: {}", path.display()); + } else { + println!( + "wrote canonical bench file ({} rows): {}", + canonical::ROWS, + path.display() + ); + } + ExitCode::SUCCESS + } + Err(e) => { + eprintln!("failed to generate canonical bench file: {e}"); + ExitCode::FAILURE + } + } +} From 60deb99cd9fb433b890a2367b60b91fadbdd9e4e Mon Sep 17 00:00:00 2001 From: Robert Kruszewski Date: Thu, 25 Jun 2026 19:33:53 +0100 Subject: [PATCH 4/4] less --- java/vortex-jni/BENCHMARKS.md | 110 ---------------------------------- 1 file changed, 110 deletions(-) delete mode 100644 java/vortex-jni/BENCHMARKS.md diff --git a/java/vortex-jni/BENCHMARKS.md b/java/vortex-jni/BENCHMARKS.md deleted file mode 100644 index 8899642780d..00000000000 --- a/java/vortex-jni/BENCHMARKS.md +++ /dev/null @@ -1,110 +0,0 @@ -# vortex-jni read-boundary benchmarks - -Two benchmarks stress the **vortex-jni read boundary** — reading a Vortex file with column projection -and filter pushdown — from two sides that read the **same canonical file**: - -- **`VortexJniReadBenchmark`** (JMH, Java) — reads *through* JNI + the Arrow C Data Interface, the - path an Iceberg `FormatModel` takes to read Vortex from the JVM. Lives in this module's `jmh` - source set (`src/jmh/java/dev/vortex/bench/`). -- **`read_boundary`** (Divan, Rust) — the **native floor**: the same `scan → Arrow` work entirely in - Rust, with no JNI crossing and no Arrow C Data export. Lives in the `vortex-jni` *crate* - (`vortex-jni/benches/read_boundary.rs`). - -Because both read the exact same bytes, run the same lanes, and report **input rows scanned per -second**, the numbers are directly comparable: the gap between them is the cost of the JNI + Arrow C -Data boundary on top of the underlying format read. - -## The lanes - -- `fullScan` / `full_scan` — read all six columns. -- `projection` — native projection of `id,y` (two of six columns). -- `selectiveFilter` / `selective_filter` — native filter `cat = 'alpha'` (~1/16 selectivity). - -`projection` vs `fullScan` isolates native **projection** pushdown (same rows, fewer columns -materialized); `selectiveFilter` vs `fullScan` isolates native **filter** pushdown (same scan, fewer -rows produced). - -**Single-threaded vs pooled**: each lane runs in two threading modes. The single-threaded mode drives -the scan on the consuming thread only (the JNI default). The **pooled** mode adds a background worker -pool sized to `available_parallelism() - 1`, so the scan's split tasks run in parallel while the -consuming thread drains results. The Rust bench runs the Vortex→Arrow conversion inside the scan's -`map` (on those handle-spawned split tasks), so the pool parallelizes both the decode and the Arrow -conversion — column-heavy `full_scan` speeds up several-fold; lanes whose Arrow output is tiny -(`projection`, `selective_filter`) are unaffected. The Rust pooled lanes are `*_pooled` -(`full_scan_pooled`, …) backed by a `CurrentThreadWorkerPool`; the JMH side is parameterized by -`workerThreads` (`0` = single-threaded, `-1` = available parallelism) via `NativeRuntime`. - -Each lane scans the full table and **materializes every result batch to Arrow, then sums the batch -row counts** (`getRowCount()` / `RecordBatch::len()`) — no per-value work. So the numbers reflect -scan + materialization + (for JMH) the boundary, not consume-side arithmetic. `@OperationsPerInvocation(ROWS)` -(JMH) and `ItemsCount::new(ROWS)` (Divan) both make this **rows scanned/s**. Vortex coalesces to -~64K-row read batches regardless of the writer's chunk size (the `VortexJniBatchDiagnostic` tool -prints this), so boundary cost is amortized over large batches by construction. - -The JMH `@Setup` validates the file before any measurement (exact row count, `cat='alpha'` returns -exactly `ROWS/|CATS|`, projection schema is exactly `[id, y]`) and fails the trial otherwise — a -corrupt file or broken filter must not silently produce impressive throughput. - -**View types**: both benches downgrade Utf8View → Utf8 (and BinaryView → Binary) when materializing -to Arrow — the JMH path because the production `scanArrow` does so for Spark and other view-less -consumers, and the Rust bench via a matching stripped target field — so the two materialize identical -Arrow types. (With row-count consumption neither actually touches the string columns.) - -## The shared canonical file - -Both benches read one canonical `.vortex` file at `target/vortex-jni-bench/data.vortex`, written by -the **Rust generator** (`vortex-jni/benches/canonical/mod.rs`, exposed as the `gen_bench_data` -example). It is a deterministic, six-column, 2M-row table (2× int64, 2× float64, 2× Utf8View) built -from a fixed formula — no RNG — so it is byte-reproducible and the two benches measure identical data. -Generation is idempotent: an existing file is reused, so whichever side runs first writes it and the -other reads it. Delete the file to regenerate. - -## Running - -**Java (JMH), through the boundary** — the `jmh` task wires everything up: - -```bash -cd java -./gradlew :vortex-jni:jmh -``` - -It (a) builds a release-optimized native lib via `buildJmhNativeLib` -(`cargo build --profile release_debug -p vortex-jni`) and stages it on the runtime classpath — the -dev `makeTestFiles` debug build, which would make numbers meaningless, is skipped while the benchmark -is in the task graph; (b) generates the canonical file via `generateBenchFile`; and (c) passes the -Arrow `--add-opens` flags and `-Dvortex.jni.bench.file=` into the forked JVM. Results land in -`vortex-jni/build/results/jmh/results.txt`. - -**Rust (Divan), the native floor**: - -```bash -# from repo root -cargo bench -p vortex-jni --bench read_boundary -``` - -The bench generates the canonical file on first run if absent. (`release_debug` is the `release` -profile plus full debug info, good for profiling; it lives in `target/release_debug/`, separate from -`target/debug` and `target/release`.) - -**Batch-granularity diagnostic** — `VortexJniBatchDiagnostic`, a standalone tool (not a JMH -benchmark) that prints read-batch row counts per writer chunk size: - -```bash -cd java -./gradlew :vortex-jni:batchDiagnostic -``` - -## Comparing the two - -Line up the JMH `ops/s` against the Divan `rows/s` lane-for-lane. The Rust floor is faster on every -lane; the ratio is the JNI + Arrow C Data boundary overhead for that access pattern (it is largest -where the most data crosses the boundary, e.g. `fullScan`, and smallest where pushdown means little -crosses it, e.g. `projection`). Both are **synthetic, single-machine, warm-cache — directional, not a -leaderboard**: `id` is sequential, `cat` is a periodic 16-value low-cardinality column (non-null, so -filter selectivity is exactly 1/16), `tag` is high-cardinality with a 10% null rate. This shape is -friendly to compression and pushdown. - -## Future work - -- A less-compressible / higher-null workload (the current shape favors compression and pushdown). -- A wider/narrower-row and a multi-file variant.