Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion java/gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ junit-jupiter = "6.1.0"
logback = "1.5.33"
netty = "4.2.14.Final"
nopen = "1.0.1"
roaringbitmap = "1.6.14"
slf4j = "2.0.18"
spark3 = "3.5.8"
spark4 = "4.1.1"
Expand All @@ -30,7 +31,8 @@ logback-classic = { module = "ch.qos.logback:logback-classic", version.ref = "lo
netty-bom = { module = "io.netty:netty-bom", version.ref = "netty" }
nopen-annotations = { module = "com.jakewharton.nopen:nopen-annotations", version.ref = "nopen" }
nopen-checker = { module = "com.jakewharton.nopen:nopen-checker", version.ref = "nopen" }
roaringbitmap = { module = "org.roaringbitmap:RoaringBitmap", version.ref = "roaringbitmap" }
slf4j-api = { module = "org.slf4j:slf4j-api", version.ref = "slf4j" }
slf4j-simple = { module = "org.slf4j:slf4j-simple", version.ref = "slf4j" }
s3mock-testcontainers = { module = "com.adobe.testing:s3mock-testcontainers", version.ref = "s3mock" }
testcontainers-juputer = { module = "org.testcontainers:junit-jupiter", version.ref = "testcontainers-jupiter" }
testcontainers-juputer = { module = "org.testcontainers:junit-jupiter", version.ref = "testcontainers-jupiter" }
27 changes: 15 additions & 12 deletions java/vortex-jni/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ dependencies {
implementation(libs.guava)
compileOnly(libs.errorprone.annotations)
compileOnly(libs.nopen.annotations)
api(libs.roaringbitmap)

// Logging
implementation(libs.slf4j.api)
Expand Down Expand Up @@ -131,18 +132,20 @@ tasks.register("makeTestFiles") {

val osName = System.getProperty("os.name").lowercase()
val osArch = System.getProperty("os.arch").lowercase()
val osShortName = when {
osName.contains("mac") -> "darwin"
osName.contains("nix") || osName.contains("nux") -> "linux"
osName.contains("win") -> "win"
else -> throw GradleException("Unsupported OS for makeTestFiles: $osName")
}
val libExt = when (osShortName) {
"darwin" -> ".dylib"
"linux" -> ".so"
"win" -> ".dll"
else -> throw GradleException("Unsupported OS short name: $osShortName")
}
val osShortName =
when {
osName.contains("mac") -> "darwin"
osName.contains("nix") || osName.contains("nux") -> "linux"
osName.contains("win") -> "win"
else -> throw GradleException("Unsupported OS for makeTestFiles: $osName")
}
val libExt =
when (osShortName) {
"darwin" -> ".dylib"
"linux" -> ".so"
"win" -> ".dll"
else -> throw GradleException("Unsupported OS short name: $osShortName")
}

// Only populate the host-arch directory so cross-compiled libs for other
// architectures (placed by the publish workflow) are preserved.
Expand Down
33 changes: 30 additions & 3 deletions java/vortex-jni/src/main/java/dev/vortex/api/DataSource.java
Original file line number Diff line number Diff line change
Expand Up @@ -136,13 +136,40 @@ public Scan scan(ScanOptions options) {
long filterPtr = options.filter().map(Expression::nativePointer).orElse(0L);
long begin = options.rowRangeBegin().orElse(0L);
long end = options.rowRangeEnd().orElse(0L);
long[] selectionIndices = options.selectionIndices().orElse(null);
byte selectionMode = options.selectionMode().code();
ScanOptions.SelectionMode selectionMode = options.selectionMode();
long[] selectionIndices = selectionIndices(options);
byte[] selectionRoaringBitmap = selectionRoaringBitmap(options);
long limit = options.limit().orElse(0L);
boolean ordered = options.ordered();

long scanPtr = dev.vortex.jni.NativeScan.create(
pointer, projectionPtr, filterPtr, begin, end, selectionIndices, selectionMode, limit, ordered);
pointer,
projectionPtr,
filterPtr,
begin,
end,
selectionIndices,
selectionRoaringBitmap,
selectionMode.code(),
limit,
ordered);
return Scan.fromPointer(session, scanPtr);
}

private static long[] selectionIndices(ScanOptions options) {
return switch (options.selectionMode()) {
case INCLUDE, EXCLUDE -> options.selectionIndices().orElse(null);
default -> null;
};
}

private static byte[] selectionRoaringBitmap(ScanOptions options) {
return switch (options.selectionMode()) {
case INCLUDE_ROARING, EXCLUDE_ROARING ->
options.selectionRoaringBitmap()
.orElseThrow(() -> new IllegalArgumentException(
"selection roaring bitmap is required for roaring selection modes"));
default -> null;
};
}
}
115 changes: 110 additions & 5 deletions java/vortex-jni/src/main/java/dev/vortex/api/ScanOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,15 @@

package dev.vortex.api;

import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalLong;
import org.immutables.value.Value;
import org.roaringbitmap.longlong.Roaring64NavigableMap;

/**
* Scan configuration passed to {@link DataSource#scan(ScanOptions)}.
Expand All @@ -28,12 +34,15 @@ public interface ScanOptions {
OptionalLong rowRangeEnd();

/**
* Sorted ascending row indices that should be included in (or excluded from) the scan, depending on
* Sorted ascending, unique row indices that should be included in (or excluded from) the scan, depending on
* {@link #selectionMode()}.
*/
Optional<long[]> selectionIndices();

/** Meaning of {@link #selectionIndices()}. */
/** Portable serialized {@link Roaring64NavigableMap} row selection. */
Optional<byte[]> selectionRoaringBitmap();

/** Meaning of the row selection payload. */
@Value.Default
default SelectionMode selectionMode() {
return SelectionMode.INCLUDE_ALL;
Expand All @@ -48,6 +57,54 @@ default boolean ordered() {
return false;
}

@Value.Check
default void validateSelectionPayload() {
boolean hasIndices = selectionIndices().isPresent();
boolean hasRoaringBitmap = selectionRoaringBitmap().isPresent();
if (hasIndices && hasRoaringBitmap) {
throw new IllegalArgumentException("row selection must use either indices or roaring bitmap, not both");
}
if (hasIndices) {
validateSelectionIndices(selectionIndices().orElseThrow());
}

switch (selectionMode()) {
case INCLUDE_ALL -> {
if (hasIndices || hasRoaringBitmap) {
throw new IllegalArgumentException("row selection payload requires a selection mode");
}
}
case INCLUDE, EXCLUDE -> {
if (!hasIndices) {
throw new IllegalArgumentException("selection indices are required for index selection modes");
}
}
case INCLUDE_ROARING, EXCLUDE_ROARING -> {
if (!hasRoaringBitmap) {
throw new IllegalArgumentException(
"selection roaring bitmap is required for roaring selection modes");
}
if (selectionRoaringBitmap().orElseThrow().length == 0) {
throw new IllegalArgumentException("selection roaring bitmap must not be empty");
}
}
}
}

private static void validateSelectionIndices(long[] selectionIndices) {
long previous = -1L;
for (int i = 0; i < selectionIndices.length; i++) {
long index = selectionIndices[i];
if (index < 0) {
throw new IllegalArgumentException("selection indices must be non-negative");
}
if (i > 0 && index <= previous) {
throw new IllegalArgumentException("selection indices must be sorted ascending and unique");
}
previous = index;
}
}

static ScanOptions of() {
return ImmutableScanOptions.builder().build();
}
Expand All @@ -56,14 +113,62 @@ static ImmutableScanOptions.Builder builder() {
return ImmutableScanOptions.builder();
}

/** How to interpret {@link #selectionIndices()}. */
/** Scan only the rows at the given sorted ascending, unique row indices. */
static ScanOptions includeRows(long... rowIndices) {
return builder()
.selectionIndices(rowIndices.clone())
.selectionMode(SelectionMode.INCLUDE)
.build();
}

/** Scan all rows except the given sorted ascending, unique row indices. */
static ScanOptions excludeRows(long... rowIndices) {
return builder()
.selectionIndices(rowIndices.clone())
.selectionMode(SelectionMode.EXCLUDE)
.build();
}

/** Scan only the rows in the given Roaring bitmap. */
static ScanOptions includeRows(Roaring64NavigableMap rowSelection) {
return builder()
.selectionRoaringBitmap(serializeRoaringBitmap(rowSelection))
.selectionMode(SelectionMode.INCLUDE_ROARING)
.build();
}

/** Scan all rows except the rows in the given Roaring bitmap. */
static ScanOptions excludeRows(Roaring64NavigableMap rowSelection) {
return builder()
.selectionRoaringBitmap(serializeRoaringBitmap(rowSelection))
.selectionMode(SelectionMode.EXCLUDE_ROARING)
.build();
}

private static byte[] serializeRoaringBitmap(Roaring64NavigableMap rowSelection) {
Objects.requireNonNull(rowSelection, "rowSelection");
try (ByteArrayOutputStream output = new ByteArrayOutputStream();
DataOutputStream dataOutput = new DataOutputStream(output)) {
rowSelection.serializePortable(dataOutput);
dataOutput.flush();
return output.toByteArray();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

/** How to interpret the row selection payload. */
enum SelectionMode {
/** Ignore {@link #selectionIndices()}. */
/** Ignore row selection payloads. */
INCLUDE_ALL((byte) 0),
/** Return only rows at the indices. */
INCLUDE((byte) 1),
/** Return rows except those at the indices. */
EXCLUDE((byte) 2);
EXCLUDE((byte) 2),
/** Return only rows in the Roaring bitmap. */
INCLUDE_ROARING((byte) 3),
/** Return rows except those in the Roaring bitmap. */
EXCLUDE_ROARING((byte) 4);

private final byte code;

Expand Down
5 changes: 4 additions & 1 deletion java/vortex-jni/src/main/java/dev/vortex/jni/NativeScan.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@ private NativeScan() {}
* @param rowRangeBegin inclusive start of the row range, 0 for "unbounded"
* @param rowRangeEnd exclusive end of the row range, 0 for "unbounded"
* @param selectionIndices sorted row indices; may be null
* @param selectionRoaringBitmap portable serialized Roaring64 bitmap; may be null
* @param selectionInclude {@code 0} (all), {@code 1} (include {@code selectionIndices}), {@code 2} (exclude
* {@code selectionIndices})
* {@code selectionIndices}), {@code 3} (include {@code selectionRoaringBitmap}), {@code 4} (exclude
* {@code selectionRoaringBitmap})
* @param limit max rows to return, or {@code 0} for "no limit"
* @param ordered true to preserve row order across partitions
*/
Expand All @@ -33,6 +35,7 @@ public static native long create(
long rowRangeBegin,
long rowRangeEnd,
long[] selectionIndices,
byte[] selectionRoaringBitmap,
byte selectionInclude,
long limit,
boolean ordered);
Expand Down
77 changes: 77 additions & 0 deletions java/vortex-jni/src/test/java/dev/vortex/api/TestMinimal.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;

import dev.vortex.arrow.ArrowAllocation;
import dev.vortex.jni.NativeLoader;
Expand All @@ -31,6 +32,7 @@
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.roaringbitmap.longlong.Roaring64NavigableMap;

public final class TestMinimal {
static final class Person {
Expand Down Expand Up @@ -180,6 +182,73 @@ public void testProjectedScanWithFilter() throws Exception {
assertEquals(List.of(new Person("John", BigDecimal.valueOf(10_000L, 2), "VA")), people);
}

@Test
public void testSelectionIncludesRows() throws Exception {
BufferAllocator allocator = ArrowAllocation.rootAllocator();
Session session = Session.create();
DataSource ds = DataSource.open(session, writePath);

List<Person> people = readAll(ds, ScanOptions.includeRows(0, 3, 9), allocator, TestMinimal::readFullBatch);
assertEquals(List.of(MINIMAL_DATA.get(0), MINIMAL_DATA.get(3), MINIMAL_DATA.get(9)), people);
}

@Test
public void testSelectionExcludesRows() throws Exception {
BufferAllocator allocator = ArrowAllocation.rootAllocator();
Session session = Session.create();
DataSource ds = DataSource.open(session, writePath);

List<Person> people = readAll(ds, ScanOptions.excludeRows(0, 9), allocator, TestMinimal::readFullBatch);
assertEquals(MINIMAL_DATA.subList(1, 9), people);
}

@Test
public void testRoaringSelectionIncludesRows() throws Exception {
BufferAllocator allocator = ArrowAllocation.rootAllocator();
Session session = Session.create();
DataSource ds = DataSource.open(session, writePath);

List<Person> people =
readAll(ds, ScanOptions.includeRows(roaringRows(0, 3, 9)), allocator, TestMinimal::readFullBatch);
assertEquals(List.of(MINIMAL_DATA.get(0), MINIMAL_DATA.get(3), MINIMAL_DATA.get(9)), people);
}

@Test
public void testRoaringSelectionExcludesRows() throws Exception {
BufferAllocator allocator = ArrowAllocation.rootAllocator();
Session session = Session.create();
DataSource ds = DataSource.open(session, writePath);

List<Person> people =
readAll(ds, ScanOptions.excludeRows(roaringRows(0, 9)), allocator, TestMinimal::readFullBatch);
assertEquals(MINIMAL_DATA.subList(1, 9), people);
}

@Test
public void testSelectionIndicesMustBeSortedAndUnique() {
IllegalArgumentException exception =
assertThrows(IllegalArgumentException.class, () -> ScanOptions.includeRows(2, 1));
assertEquals("selection indices must be sorted ascending and unique", exception.getMessage());
}

@Test
public void testSelectionPayloadMustChooseIndicesOrRoaring() {
IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, () -> ScanOptions.builder()
.selectionMode(ScanOptions.SelectionMode.INCLUDE)
.selectionIndices(new long[] {0})
.selectionRoaringBitmap(new byte[] {1})
.build());
assertEquals("row selection must use either indices or roaring bitmap, not both", exception.getMessage());
}

@Test
public void testSelectionPayloadRequiresMode() {
IllegalArgumentException exception = assertThrows(
IllegalArgumentException.class,
() -> ScanOptions.builder().selectionIndices(new long[] {0}).build());
assertEquals("row selection payload requires a selection mode", exception.getMessage());
}

private interface BatchReader {
List<Person> read(VectorSchemaRoot root);
}
Expand Down Expand Up @@ -221,4 +290,12 @@ private static List<Person> readFullBatch(VectorSchemaRoot root) {
}
return result;
}

private static Roaring64NavigableMap roaringRows(long... rows) {
Roaring64NavigableMap bitmap = new Roaring64NavigableMap();
for (long row : rows) {
bitmap.addLong(row);
}
return bitmap;
}
}
1 change: 1 addition & 0 deletions vortex-jni/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ futures = { workspace = true }
jni = { workspace = true }
object_store = { workspace = true, features = ["aws", "azure", "gcp"] }
parking_lot = { workspace = true }
roaring = { workspace = true }
thiserror = { workspace = true }
tracing = { workspace = true, features = ["std", "log"] }
tracing-subscriber = { workspace = true, features = ["env-filter"] }
Expand Down
Loading
Loading