From f426275cc11e280789c4f11ef6d635f2cc7a2aff Mon Sep 17 00:00:00 2001 From: lurnagao-dahua Date: Sun, 1 Feb 2026 20:04:02 +0800 Subject: [PATCH 1/5] support page skipping when using vectorized Parquet reader --- .../vectorized/VectorizedArrowReader.java | 18 +- .../BaseVectorizedParquetValuesReader.java | 22 +- .../vectorized/parquet/ParquetReadState.java | 204 +++++++++++++ .../parquet/VectorizedColumnIterator.java | 153 +++++----- .../VectorizedDeltaEncodedValuesReader.java | 10 + .../parquet/VectorizedPageIterator.java | 170 +++-------- ...ectorizedParquetDefinitionLevelReader.java | 181 ++++++++++-- .../parquet/VectorizedPlainValuesReader.java | 65 +++++ .../parquet/VectorizedValuesReader.java | 35 +++ .../org/apache/iceberg/parquet/Parquet.java | 58 +++- .../iceberg/parquet/ParquetFilters.java | 7 +- .../org/apache/iceberg/parquet/ReadConf.java | 20 +- .../iceberg/parquet/ValuesAsBytesReader.java | 6 +- .../parquet/VectorizedParquetReader.java | 2 +- .../iceberg/spark/data/GenericsHelpers.java | 15 + ...estParquetPageSkippingVectorizedReads.java | 276 ++++++++++++++++++ ...estParquetPageSkippingVectorizedReads.java | 276 ++++++++++++++++++ ...estParquetPageSkippingVectorizedReads.java | 276 ++++++++++++++++++ ...estParquetPageSkippingVectorizedReads.java | 276 ++++++++++++++++++ 19 files changed, 1817 insertions(+), 253 deletions(-) create mode 100644 arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/ParquetReadState.java create mode 100644 spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetPageSkippingVectorizedReads.java create mode 100644 spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetPageSkippingVectorizedReads.java create mode 100644 spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetPageSkippingVectorizedReads.java create mode 100644 spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetPageSkippingVectorizedReads.java diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java index 2cc7cde4541a..fde62031c761 100644 --- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java +++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java @@ -43,6 +43,7 @@ import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.arrow.ArrowAllocation; import org.apache.iceberg.arrow.ArrowSchemaUtil; +import org.apache.iceberg.arrow.vectorized.parquet.ParquetReadState; import org.apache.iceberg.arrow.vectorized.parquet.VectorizedColumnIterator; import org.apache.iceberg.parquet.ParquetUtil; import org.apache.iceberg.parquet.VectorizedReader; @@ -372,11 +373,13 @@ private void allocateVectorBasedOnTypeName(PrimitiveType primitive, Field arrowF @Override public void setRowGroupInfo(PageReadStore source, Map metadata) { + ParquetReadState readState = new ParquetReadState(source.getRowIndexes().orElse(null)); ColumnChunkMetaData chunkMetaData = metadata.get(ColumnPath.get(columnDescriptor.getPath())); this.dictionary = vectorizedColumnIterator.setRowGroupInfo( source.getPageReader(columnDescriptor), - !ParquetUtil.hasNonDictionaryPages(chunkMetaData)); + !ParquetUtil.hasNonDictionaryPages(chunkMetaData), + readState); } @Override @@ -649,6 +652,9 @@ private static final class PositionVectorReader extends VectorizedArrowReader { private long rowStart; private int batchSize; private NullabilityHolder nulls; + private ParquetReadState readState; + private long readOrder; + private long curRowPosInRowGroup; PositionVectorReader(boolean setArrowValidityVector) { super(MetadataColumns.ROW_POSITION); @@ -667,7 +673,11 @@ public VectorHolder read(VectorHolder reuse, int numValsToRead) { ArrowBuf dataBuffer = vec.getDataBuffer(); for (int i = 0; i < numValsToRead; i += 1) { - dataBuffer.setLong((long) i * Long.BYTES, rowStart + i); + curRowPosInRowGroup = + readState.getReadOrderToRowGroupPosMap().getOrDefault(readOrder, curRowPosInRowGroup); + dataBuffer.setLong((long) i * Long.BYTES, rowStart + curRowPosInRowGroup); + readOrder++; + curRowPosInRowGroup++; } if (setArrowValidityVector) { @@ -677,7 +687,6 @@ public VectorHolder read(VectorHolder reuse, int numValsToRead) { } } - rowStart += numValsToRead; vec.setValueCount(numValsToRead); return new VectorHolder.PositionVectorHolder(vec, MetadataColumns.ROW_POSITION, nulls); @@ -700,6 +709,9 @@ public void setRowGroupInfo( () -> new IllegalArgumentException( "PageReadStore does not contain row index offset")); + readState = new ParquetReadState(source.getRowIndexes().orElse(null)); + readOrder = 0; + curRowPosInRowGroup = 0; } @Override diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/BaseVectorizedParquetValuesReader.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/BaseVectorizedParquetValuesReader.java index b0a75cceb5b3..02ecd220433e 100644 --- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/BaseVectorizedParquetValuesReader.java +++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/BaseVectorizedParquetValuesReader.java @@ -217,7 +217,7 @@ public boolean readBoolean() { @Override public void skip() { - throw new UnsupportedOperationException(); + this.readInteger(); } @Override @@ -240,4 +240,24 @@ public int readInteger() { } throw new RuntimeException("Unrecognized mode: " + mode); } + + /** Skip `n` values from the current reader. */ + public void skipValues(int total) { + int left = total; + while (left > 0) { + if (this.currentCount == 0) { + this.readNextGroup(); + } + int num = Math.min(left, this.currentCount); + switch (mode) { + case RLE: + break; + case PACKED: + packedValuesBufferIdx += num; + break; + } + currentCount -= num; + left -= num; + } + } } diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/ParquetReadState.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/ParquetReadState.java new file mode 100644 index 000000000000..bc5cee7a9abf --- /dev/null +++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/ParquetReadState.java @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.arrow.vectorized.parquet; + +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.PrimitiveIterator; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; + +public class ParquetReadState { + /** A special row range used when there is no row indexes (hence all rows must be included) */ + private static final RowRange MAX_ROW_RANGE = new RowRange(Long.MIN_VALUE, Long.MAX_VALUE); + + /** + * A special row range used when the row indexes are present AND all the row ranges have been + * processed. This serves as a sentinel at the end indicating that all rows come after the last + * row range should be skipped. + */ + private static final RowRange END_ROW_RANGE = new RowRange(Long.MAX_VALUE, Long.MIN_VALUE); + + /** + * The current index over all rows within the column chunk. This is used to check if the current + * row should be skipped by comparing against the row ranges. + */ + private long currentRowIndex; + + /** The offset in the current batch to put the next value in value vector */ + private int valueOffset; + + /** The remaining number of values to read in the current page */ + private int valuesToReadInPage; + + /** Iterator over all row ranges, only not-null if column index is present */ + private final Iterator rowRanges; + + /** The current row range */ + private RowRange currentRange; + + /** The remaining number of rows to read in the current batch */ + private int rowsToReadInBatch; + + /** The actual number of rows read in this batch on the current page, including skipped rows */ + private int rowsWithSkipsInThisBatch; + + /** + * Mapping from read order to actual position in row group. rowIndexes: [0, 1, 2, 4, 5, 7, 8, 9] + * -> rowRanges: [0-2], [4-5], [7-9] readOrderToRowGroupPosMap: [0 -> 0, 3 -> 4, 5 -> 7] + */ + private final Map readOrderToRowGroupPosMap = Maps.newHashMap(); + + public ParquetReadState(PrimitiveIterator.OfLong rowIndexes) { + this.rowRanges = constructRanges(rowIndexes); + nextRange(); + } + + /** Advance to the next range. */ + void nextRange() { + if (rowRanges == null) { + currentRange = MAX_ROW_RANGE; + } else if (!rowRanges.hasNext()) { + currentRange = END_ROW_RANGE; + } else { + currentRange = rowRanges.next(); + } + } + + public long currentRangeStart() { + return currentRange.getStart(); + } + + public long currentRangeEnd() { + return currentRange.getEnd(); + } + + /** Must be called at the beginning of reading a new page. */ + void resetForNewPage(int totalValuesInPage, long pageFirstRowIndex) { + this.valuesToReadInPage = totalValuesInPage; + this.currentRowIndex = pageFirstRowIndex; + } + + /** Must be called at the beginning of reading a new batch. */ + void resetForNewBatch(int batchSize) { + this.valueOffset = 0; + this.rowsToReadInBatch = batchSize; + } + + /** + * Construct a list of row ranges from the given `rowIndexes`. For example, suppose the + * `rowIndexes` are `[0, 1, 2, 4, 5, 7, 8, 9]`, it will be converted into 3 row ranges: `[0-2], + * [4-5], [7-9]`. + */ + private Iterator constructRanges(PrimitiveIterator.OfLong rowIndexes) { + if (rowIndexes == null) { + return null; + } + + List ranges = Lists.newArrayList(); + long currentStart = Long.MIN_VALUE; + long previous = Long.MIN_VALUE; + long readOrder = 0; + + while (rowIndexes.hasNext()) { + long idx = rowIndexes.nextLong(); + if (currentStart == Long.MIN_VALUE) { + currentStart = idx; + } else if (previous + 1 != idx) { + RowRange range = new RowRange(currentStart, previous); + readOrderToRowGroupPosMap.put(readOrder, currentStart); + readOrder += previous - currentStart + 1; + ranges.add(range); + currentStart = idx; + } + previous = idx; + } + + if (previous != Long.MIN_VALUE) { + ranges.add(new RowRange(currentStart, previous)); + readOrderToRowGroupPosMap.put(readOrder, currentStart); + } + + return ranges.iterator(); + } + + int getValuesToReadInPage() { + return valuesToReadInPage; + } + + void setValuesToReadInPage(int valuesToReadInPage) { + this.valuesToReadInPage = valuesToReadInPage; + } + + long getCurrentRowIndex() { + return currentRowIndex; + } + + void setCurrentRowIndex(long currentRowIndex) { + this.currentRowIndex = currentRowIndex; + } + + int getRowsToReadInBatch() { + return rowsToReadInBatch; + } + + public void setRowsToReadInBatch(int rowsToReadInBatch) { + this.rowsToReadInBatch = rowsToReadInBatch; + } + + public int getRowsWithSkipsInThisBatch() { + return rowsWithSkipsInThisBatch; + } + + public void setRowsWithSkipsInThisBatch(int rowsWithSkipsInThisBatch) { + this.rowsWithSkipsInThisBatch = rowsWithSkipsInThisBatch; + } + + public int getValueOffset() { + return valueOffset; + } + + public void setValueOffset(int valueOffset) { + this.valueOffset = valueOffset; + } + + public Map getReadOrderToRowGroupPosMap() { + return readOrderToRowGroupPosMap; + } + + /** Helper struct to represent a range of row indexes `[start, end]`. */ + private static class RowRange { + private final long start; + private final long end; + + RowRange(long start, long end) { + this.start = start; + this.end = end; + } + + public long getStart() { + return start; + } + + public long getEnd() { + return end; + } + } +} diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedColumnIterator.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedColumnIterator.java index 8e52fd1a28c3..cb81e05cceb0 100644 --- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedColumnIterator.java +++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedColumnIterator.java @@ -26,6 +26,7 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.Dictionary; +import org.apache.parquet.column.page.DataPage; import org.apache.parquet.column.page.PageReader; /** @@ -36,6 +37,7 @@ public class VectorizedColumnIterator extends BaseColumnIterator { private final VectorizedPageIterator vectorizedPageIterator; private int batchSize; + private ParquetReadState readState; public VectorizedColumnIterator( ColumnDescriptor desc, String writerVersion, boolean setArrowValidityVector) { @@ -51,10 +53,12 @@ public void setBatchSize(int batchSize) { this.batchSize = batchSize; } - public Dictionary setRowGroupInfo(PageReader store, boolean allPagesDictEncoded) { + public Dictionary setRowGroupInfo( + PageReader store, boolean allPagesDictEncoded, ParquetReadState state) { // setPageSource can result in a data page read. If that happens, we need // to know in advance whether all the pages in the row group are dictionary encoded or not this.vectorizedPageIterator.setAllPagesDictEncoded(allPagesDictEncoded); + this.readState = state; super.setPageSource(store); return dictionary; } @@ -64,167 +68,154 @@ protected BasePageIterator pageIterator() { return vectorizedPageIterator; } + @Override + protected void advance() { + if (readState.getValuesToReadInPage() == 0) { + BasePageIterator pageIterator = pageIterator(); + while (!pageIterator.hasNext()) { + DataPage page = pageSource.readPage(); + if (page != null) { + pageIterator.setPage(page); + readState.resetForNewPage(page.getValueCount(), page.getFirstRowIndex().orElse(0L)); + this.advanceNextPageCount += pageIterator.currentPageCount(); + } else { + return; + } + } + } + } + public boolean producesDictionaryEncodedVector() { return vectorizedPageIterator.producesDictionaryEncodedVector(); } public abstract class BatchReader { public void nextBatch(FieldVector fieldVector, int typeWidth, NullabilityHolder holder) { - int rowsReadSoFar = 0; - while (rowsReadSoFar < batchSize && hasNext()) { + readState.resetForNewBatch(batchSize); + while (readState.getRowsToReadInBatch() > 0 && hasNext()) { advance(); - int rowsInThisBatch = - nextBatchOf(fieldVector, batchSize - rowsReadSoFar, rowsReadSoFar, typeWidth, holder); - rowsReadSoFar += rowsInThisBatch; - triplesRead += rowsInThisBatch; - fieldVector.setValueCount(rowsReadSoFar); + readState.setRowsWithSkipsInThisBatch(0); + nextBatchOf(fieldVector, typeWidth, holder, readState); + triplesRead += readState.getRowsWithSkipsInThisBatch(); + fieldVector.setValueCount(batchSize - readState.getRowsToReadInBatch()); } } - protected abstract int nextBatchOf( - FieldVector vector, - int expectedBatchSize, - int numValsInVector, - int typeWidth, - NullabilityHolder holder); + protected abstract void nextBatchOf( + FieldVector vector, int typeWidth, NullabilityHolder holder, ParquetReadState state); } public class IntegerBatchReader extends BatchReader { @Override - protected int nextBatchOf( + protected void nextBatchOf( final FieldVector vector, - final int expectedBatchSize, - final int numValsInVector, final int typeWidth, - NullabilityHolder holder) { - return vectorizedPageIterator - .intPageReader() - .nextBatch(vector, expectedBatchSize, numValsInVector, typeWidth, holder); + NullabilityHolder holder, + ParquetReadState state) { + vectorizedPageIterator.intPageReader().nextBatch(vector, typeWidth, holder, state); } } public class DictionaryBatchReader extends BatchReader { @Override - protected int nextBatchOf( + protected void nextBatchOf( final FieldVector vector, - final int expectedBatchSize, - final int numValsInVector, final int typeWidth, - NullabilityHolder holder) { - return vectorizedPageIterator.nextBatchDictionaryIds( - (IntVector) vector, expectedBatchSize, numValsInVector, holder); + NullabilityHolder holder, + ParquetReadState state) { + vectorizedPageIterator.nextBatchDictionaryIds((IntVector) vector, holder, state); } } public class LongBatchReader extends BatchReader { @Override - protected int nextBatchOf( + protected void nextBatchOf( final FieldVector vector, - final int expectedBatchSize, - final int numValsInVector, final int typeWidth, - NullabilityHolder holder) { - return vectorizedPageIterator - .longPageReader() - .nextBatch(vector, expectedBatchSize, numValsInVector, typeWidth, holder); + NullabilityHolder holder, + ParquetReadState state) { + vectorizedPageIterator.longPageReader().nextBatch(vector, typeWidth, holder, state); } } public class TimestampMillisBatchReader extends BatchReader { @Override - protected int nextBatchOf( + protected void nextBatchOf( final FieldVector vector, - final int expectedBatchSize, - final int numValsInVector, final int typeWidth, - NullabilityHolder holder) { - return vectorizedPageIterator + NullabilityHolder holder, + ParquetReadState state) { + vectorizedPageIterator .timestampMillisPageReader() - .nextBatch(vector, expectedBatchSize, numValsInVector, typeWidth, holder); + .nextBatch(vector, typeWidth, holder, state); } } public class TimestampInt96BatchReader extends BatchReader { @Override - protected int nextBatchOf( + protected void nextBatchOf( final FieldVector vector, - final int expectedBatchSize, - final int numValsInVector, final int typeWidth, - NullabilityHolder holder) { - return vectorizedPageIterator - .timestampInt96PageReader() - .nextBatch(vector, expectedBatchSize, numValsInVector, typeWidth, holder); + NullabilityHolder holder, + ParquetReadState state) { + vectorizedPageIterator.timestampInt96PageReader().nextBatch(vector, typeWidth, holder, state); } } public class FloatBatchReader extends BatchReader { @Override - protected int nextBatchOf( + protected void nextBatchOf( final FieldVector vector, - final int expectedBatchSize, - final int numValsInVector, final int typeWidth, - NullabilityHolder holder) { - return vectorizedPageIterator - .floatPageReader() - .nextBatch(vector, expectedBatchSize, numValsInVector, typeWidth, holder); + NullabilityHolder holder, + ParquetReadState state) { + vectorizedPageIterator.floatPageReader().nextBatch(vector, typeWidth, holder, state); } } public class DoubleBatchReader extends BatchReader { @Override - protected int nextBatchOf( + protected void nextBatchOf( final FieldVector vector, - final int expectedBatchSize, - final int numValsInVector, final int typeWidth, - NullabilityHolder holder) { - return vectorizedPageIterator - .doublePageReader() - .nextBatch(vector, expectedBatchSize, numValsInVector, typeWidth, holder); + NullabilityHolder holder, + ParquetReadState state) { + vectorizedPageIterator.doublePageReader().nextBatch(vector, typeWidth, holder, state); } } public class FixedSizeBinaryBatchReader extends BatchReader { @Override - protected int nextBatchOf( + protected void nextBatchOf( final FieldVector vector, - final int expectedBatchSize, - final int numValsInVector, final int typeWidth, - NullabilityHolder holder) { - return vectorizedPageIterator + NullabilityHolder holder, + ParquetReadState state) { + vectorizedPageIterator .fixedSizeBinaryPageReader() - .nextBatch(vector, expectedBatchSize, numValsInVector, typeWidth, holder); + .nextBatch(vector, typeWidth, holder, state); } } public class VarWidthTypeBatchReader extends BatchReader { @Override - protected int nextBatchOf( + protected void nextBatchOf( final FieldVector vector, - final int expectedBatchSize, - final int numValsInVector, final int typeWidth, - NullabilityHolder holder) { - return vectorizedPageIterator - .varWidthTypePageReader() - .nextBatch(vector, expectedBatchSize, numValsInVector, typeWidth, holder); + NullabilityHolder holder, + ParquetReadState state) { + vectorizedPageIterator.varWidthTypePageReader().nextBatch(vector, typeWidth, holder, state); } } public class BooleanBatchReader extends BatchReader { @Override - protected int nextBatchOf( + protected void nextBatchOf( final FieldVector vector, - final int expectedBatchSize, - final int numValsInVector, final int typeWidth, - NullabilityHolder holder) { - return vectorizedPageIterator - .booleanPageReader() - .nextBatch(vector, expectedBatchSize, numValsInVector, typeWidth, holder); + NullabilityHolder holder, + ParquetReadState state) { + vectorizedPageIterator.booleanPageReader().nextBatch(vector, typeWidth, holder, state); } } diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedDeltaEncodedValuesReader.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedDeltaEncodedValuesReader.java index 115518e1fb50..baa877987e21 100644 --- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedDeltaEncodedValuesReader.java +++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedDeltaEncodedValuesReader.java @@ -132,11 +132,21 @@ public void readIntegers(int total, FieldVector vec, int rowId) { readValues(total, vec, rowId, INT_SIZE, (f, i, v) -> f.getDataBuffer().setInt(i, (int) v)); } + @Override + public void skipIntegers(int total) { + readValues(total, null, -1, INT_SIZE, (f, i, v) -> {}); + } + @Override public void readLongs(int total, FieldVector vec, int rowId) { readValues(total, vec, rowId, LONG_SIZE, (f, i, v) -> f.getDataBuffer().setLong(i, v)); } + @Override + public void skipLongs(int total) { + readValues(total, null, -1, LONG_SIZE, (f, i, v) -> {}); + } + /** DELTA_BINARY_PACKED only supports INT32 and INT64 */ @Override public void readFloats(int total, FieldVector vec, int rowId) { diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedPageIterator.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedPageIterator.java index be1a3324ae43..18e6737125fe 100644 --- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedPageIterator.java +++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedPageIterator.java @@ -155,81 +155,49 @@ protected void initDefinitionLevelsReader(DataPageV2 dataPageV2, ColumnDescripto * Method for reading a batch of dictionary ids from the dictionary encoded data pages. Like * definition levels, dictionary ids in Parquet are RLE/bin-packed encoded as well. */ - public int nextBatchDictionaryIds( - final IntVector vector, - final int expectedBatchSize, - final int numValsInVector, - NullabilityHolder holder) { - final int actualBatchSize = getActualBatchSize(expectedBatchSize); - if (actualBatchSize <= 0) { - return 0; - } + public void nextBatchDictionaryIds( + final IntVector vector, NullabilityHolder holder, ParquetReadState readState) { vectorizedDefinitionLevelReader .dictionaryIdReader() - .nextDictEncodedBatch( - vector, - numValsInVector, - -1, - actualBatchSize, - holder, - dictionaryEncodedValuesReader, - null); - triplesRead += actualBatchSize; - this.hasNext = triplesRead < triplesCount; - return actualBatchSize; + .nextDictEncodedBatch(vector, -1, holder, dictionaryEncodedValuesReader, null, readState); + this.hasNext = readState.getValuesToReadInPage() > 0; } abstract class BasePageReader { - public int nextBatch( - FieldVector vector, - int expectedBatchSize, - int numValsInVector, - int typeWidth, - NullabilityHolder holder) { - final int actualBatchSize = getActualBatchSize(expectedBatchSize); - if (actualBatchSize <= 0) { - return 0; - } + public void nextBatch( + FieldVector vector, int typeWidth, NullabilityHolder holder, ParquetReadState readState) { if (dictionaryDecodeMode == DictionaryDecodeMode.EAGER) { - nextDictEncodedVal(vector, actualBatchSize, numValsInVector, typeWidth, holder); + nextDictEncodedVal(vector, typeWidth, holder, readState); } else { - nextVal(vector, actualBatchSize, numValsInVector, typeWidth, holder); + nextVal(vector, typeWidth, holder, readState); } - triplesRead += actualBatchSize; - hasNext = triplesRead < triplesCount; - return actualBatchSize; + hasNext = readState.getValuesToReadInPage() > 0; } protected abstract void nextVal( - FieldVector vector, int batchSize, int numVals, int typeWidth, NullabilityHolder holder); + FieldVector vector, int typeWidth, NullabilityHolder holder, ParquetReadState readState); protected abstract void nextDictEncodedVal( - FieldVector vector, int batchSize, int numVals, int typeWidth, NullabilityHolder holder); + FieldVector vector, int typeWidth, NullabilityHolder holder, ParquetReadState readState); } /** Method for reading a batch of values of INT32 data type */ class IntPageReader extends BasePageReader { @Override protected void nextVal( - FieldVector vector, int batchSize, int numVals, int typeWidth, NullabilityHolder holder) { + FieldVector vector, int typeWidth, NullabilityHolder holder, ParquetReadState readState) { vectorizedDefinitionLevelReader .integerReader() - .nextBatch(vector, numVals, typeWidth, batchSize, holder, valuesReader); + .nextBatch(vector, typeWidth, holder, valuesReader, readState); } @Override protected void nextDictEncodedVal( - FieldVector vector, int batchSize, int numVals, int typeWidth, NullabilityHolder holder) { + FieldVector vector, int typeWidth, NullabilityHolder holder, ParquetReadState readState) { vectorizedDefinitionLevelReader .integerReader() .nextDictEncodedBatch( - vector, - numVals, - typeWidth, - batchSize, - holder, - dictionaryEncodedValuesReader, - dictionary); + vector, typeWidth, holder, dictionaryEncodedValuesReader, dictionary, readState); } } @@ -238,25 +206,19 @@ class LongPageReader extends BasePageReader { @Override protected void nextVal( - FieldVector vector, int batchSize, int numVals, int typeWidth, NullabilityHolder holder) { + FieldVector vector, int typeWidth, NullabilityHolder holder, ParquetReadState readState) { vectorizedDefinitionLevelReader .longReader() - .nextBatch(vector, numVals, typeWidth, batchSize, holder, valuesReader); + .nextBatch(vector, typeWidth, holder, valuesReader, readState); } @Override protected void nextDictEncodedVal( - FieldVector vector, int batchSize, int numVals, int typeWidth, NullabilityHolder holder) { + FieldVector vector, int typeWidth, NullabilityHolder holder, ParquetReadState readState) { vectorizedDefinitionLevelReader .longReader() .nextDictEncodedBatch( - vector, - numVals, - typeWidth, - batchSize, - holder, - dictionaryEncodedValuesReader, - dictionary); + vector, typeWidth, holder, dictionaryEncodedValuesReader, dictionary, readState); } } @@ -269,25 +231,19 @@ class TimestampMillisPageReader extends BasePageReader { @Override protected void nextVal( - FieldVector vector, int batchSize, int numVals, int typeWidth, NullabilityHolder holder) { + FieldVector vector, int typeWidth, NullabilityHolder holder, ParquetReadState readState) { vectorizedDefinitionLevelReader .timestampMillisReader() - .nextBatch(vector, numVals, typeWidth, batchSize, holder, valuesReader); + .nextBatch(vector, typeWidth, holder, valuesReader, readState); } @Override protected void nextDictEncodedVal( - FieldVector vector, int batchSize, int numVals, int typeWidth, NullabilityHolder holder) { + FieldVector vector, int typeWidth, NullabilityHolder holder, ParquetReadState readState) { vectorizedDefinitionLevelReader .timestampMillisReader() .nextDictEncodedBatch( - vector, - numVals, - typeWidth, - batchSize, - holder, - dictionaryEncodedValuesReader, - dictionary); + vector, typeWidth, holder, dictionaryEncodedValuesReader, dictionary, readState); } } @@ -295,25 +251,19 @@ protected void nextDictEncodedVal( class TimestampInt96PageReader extends BasePageReader { @Override protected void nextVal( - FieldVector vector, int batchSize, int numVals, int typeWidth, NullabilityHolder holder) { + FieldVector vector, int typeWidth, NullabilityHolder holder, ParquetReadState readState) { vectorizedDefinitionLevelReader .timestampInt96Reader() - .nextBatch(vector, numVals, typeWidth, batchSize, holder, valuesReader); + .nextBatch(vector, typeWidth, holder, valuesReader, readState); } @Override protected void nextDictEncodedVal( - FieldVector vector, int batchSize, int numVals, int typeWidth, NullabilityHolder holder) { + FieldVector vector, int typeWidth, NullabilityHolder holder, ParquetReadState readState) { vectorizedDefinitionLevelReader .timestampInt96Reader() .nextDictEncodedBatch( - vector, - numVals, - typeWidth, - batchSize, - holder, - dictionaryEncodedValuesReader, - dictionary); + vector, typeWidth, holder, dictionaryEncodedValuesReader, dictionary, readState); } } @@ -322,25 +272,19 @@ class FloatPageReader extends BasePageReader { @Override protected void nextVal( - FieldVector vector, int batchSize, int numVals, int typeWidth, NullabilityHolder holder) { + FieldVector vector, int typeWidth, NullabilityHolder holder, ParquetReadState readState) { vectorizedDefinitionLevelReader .floatReader() - .nextBatch(vector, numVals, typeWidth, batchSize, holder, valuesReader); + .nextBatch(vector, typeWidth, holder, valuesReader, readState); } @Override protected void nextDictEncodedVal( - FieldVector vector, int batchSize, int numVals, int typeWidth, NullabilityHolder holder) { + FieldVector vector, int typeWidth, NullabilityHolder holder, ParquetReadState readState) { vectorizedDefinitionLevelReader .floatReader() .nextDictEncodedBatch( - vector, - numVals, - typeWidth, - batchSize, - holder, - dictionaryEncodedValuesReader, - dictionary); + vector, typeWidth, holder, dictionaryEncodedValuesReader, dictionary, readState); } } @@ -349,54 +293,38 @@ class DoublePageReader extends BasePageReader { @Override protected void nextVal( - FieldVector vector, int batchSize, int numVals, int typeWidth, NullabilityHolder holder) { + FieldVector vector, int typeWidth, NullabilityHolder holder, ParquetReadState readState) { vectorizedDefinitionLevelReader .doubleReader() - .nextBatch(vector, numVals, typeWidth, batchSize, holder, valuesReader); + .nextBatch(vector, typeWidth, holder, valuesReader, readState); } @Override protected void nextDictEncodedVal( - FieldVector vector, int batchSize, int numVals, int typeWidth, NullabilityHolder holder) { + FieldVector vector, int typeWidth, NullabilityHolder holder, ParquetReadState readState) { vectorizedDefinitionLevelReader .doubleReader() .nextDictEncodedBatch( - vector, - numVals, - typeWidth, - batchSize, - holder, - dictionaryEncodedValuesReader, - dictionary); + vector, typeWidth, holder, dictionaryEncodedValuesReader, dictionary, readState); } } - private int getActualBatchSize(int expectedBatchSize) { - return Math.min(expectedBatchSize, triplesCount - triplesRead); - } - class FixedSizeBinaryPageReader extends BasePageReader { @Override protected void nextVal( - FieldVector vector, int batchSize, int numVals, int typeWidth, NullabilityHolder holder) { + FieldVector vector, int typeWidth, NullabilityHolder holder, ParquetReadState readState) { vectorizedDefinitionLevelReader .fixedSizeBinaryReader() - .nextBatch(vector, numVals, typeWidth, batchSize, holder, valuesReader); + .nextBatch(vector, typeWidth, holder, valuesReader, readState); } @Override protected void nextDictEncodedVal( - FieldVector vector, int batchSize, int numVals, int typeWidth, NullabilityHolder holder) { + FieldVector vector, int typeWidth, NullabilityHolder holder, ParquetReadState readState) { vectorizedDefinitionLevelReader .fixedSizeBinaryReader() .nextDictEncodedBatch( - vector, - numVals, - typeWidth, - batchSize, - holder, - dictionaryEncodedValuesReader, - dictionary); + vector, typeWidth, holder, dictionaryEncodedValuesReader, dictionary, readState); } } @@ -404,25 +332,19 @@ protected void nextDictEncodedVal( class VarWidthTypePageReader extends BasePageReader { @Override protected void nextVal( - FieldVector vector, int batchSize, int numVals, int typeWidth, NullabilityHolder holder) { + FieldVector vector, int typeWidth, NullabilityHolder holder, ParquetReadState readState) { vectorizedDefinitionLevelReader .varWidthReader() - .nextBatch(vector, numVals, typeWidth, batchSize, holder, valuesReader); + .nextBatch(vector, typeWidth, holder, valuesReader, readState); } @Override protected void nextDictEncodedVal( - FieldVector vector, int batchSize, int numVals, int typeWidth, NullabilityHolder holder) { + FieldVector vector, int typeWidth, NullabilityHolder holder, ParquetReadState readState) { vectorizedDefinitionLevelReader .varWidthReader() .nextDictEncodedBatch( - vector, - numVals, - typeWidth, - batchSize, - holder, - dictionaryEncodedValuesReader, - dictionary); + vector, typeWidth, holder, dictionaryEncodedValuesReader, dictionary, readState); } } @@ -430,15 +352,15 @@ protected void nextDictEncodedVal( class BooleanPageReader extends BasePageReader { @Override protected void nextVal( - FieldVector vector, int batchSize, int numVals, int typeWidth, NullabilityHolder holder) { + FieldVector vector, int typeWidth, NullabilityHolder holder, ParquetReadState readState) { vectorizedDefinitionLevelReader .booleanReader() - .nextBatch(vector, numVals, typeWidth, batchSize, holder, valuesReader); + .nextBatch(vector, typeWidth, holder, valuesReader, readState); } @Override protected void nextDictEncodedVal( - FieldVector vector, int batchSize, int numVals, int typeWidth, NullabilityHolder holder) { + FieldVector vector, int typeWidth, NullabilityHolder holder, ParquetReadState readState) { throw new UnsupportedOperationException(); } } diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedParquetDefinitionLevelReader.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedParquetDefinitionLevelReader.java index 1ca3bfe809c0..66185f23f6af 100644 --- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedParquetDefinitionLevelReader.java +++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedParquetDefinitionLevelReader.java @@ -30,6 +30,7 @@ import org.apache.iceberg.arrow.vectorized.NullabilityHolder; import org.apache.iceberg.parquet.ParquetUtil; import org.apache.parquet.column.Dictionary; +import org.apache.parquet.column.values.ValuesReader; public final class VectorizedParquetDefinitionLevelReader extends BaseVectorizedParquetValuesReader { @@ -49,46 +50,82 @@ interface ReaderFunction { void apply(Mode mode, int idx, int numValues, byte[] byteArray, ArrowBuf validityBuffer); } + @FunctionalInterface + interface SkipValuesFunction { + void apply(Mode mode, int total); + } + abstract class CommonReader { private void nextBatch( final FieldVector vector, - final int startOffset, final int typeWidth, - final int numValsToRead, - ReaderFunction consumer) { - int idx = startOffset; - int left = numValsToRead; - while (left > 0) { + ReaderFunction consumer, + SkipValuesFunction skipValuesFunction, + ParquetReadState state) { + int leftInBatch = state.getRowsToReadInBatch(); + int leftInPage = state.getValuesToReadInPage(); + long rowId = state.getCurrentRowIndex(); + int rowsWithSkipsInThisBatch = 0; + while (leftInBatch > 0 && leftInPage > 0) { if (currentCount == 0) { readNextGroup(); } - int numValues = Math.min(left, currentCount); + int numValues = Math.min(leftInBatch, Math.min(leftInPage, currentCount)); + long rangeStart = state.currentRangeStart(); + long rangeEnd = state.currentRangeEnd(); byte[] byteArray = null; if (typeWidth > -1) { byteArray = new byte[typeWidth]; } ArrowBuf validityBuffer = vector.getValidityBuffer(); - - consumer.apply(mode, idx, numValues, byteArray, validityBuffer); - idx += numValues; - left -= numValues; - currentCount -= numValues; + // If [rowId, rowId + numValues) is wholly before or after the current row range, + // we skip to the start of the current row range or advance the current row range + if (rowId + numValues < rangeStart) { + skipValuesFunction.apply(mode, numValues); + rowId += numValues; + leftInPage -= numValues; + rowsWithSkipsInThisBatch += numValues; + } else if (rowId > rangeEnd) { + state.nextRange(); + } else { + // The range [rowId, rowId + numValues) overlaps with the current row range in state + long start = Math.max(rangeStart, rowId); + long end = Math.min(rangeEnd, rowId + numValues - 1); + // Skip the part [rowId, start) + int toSkip = (int) (start - rowId); + if (toSkip > 0) { + skipValuesFunction.apply(mode, toSkip); + rowId += toSkip; + leftInPage -= toSkip; + rowsWithSkipsInThisBatch += toSkip; + } + // Read the part [start, end] + numValues = (int) (end - start + 1); + consumer.apply(mode, state.getValueOffset(), numValues, byteArray, validityBuffer); + state.setValueOffset(state.getValueOffset() + numValues); + leftInBatch -= numValues; + leftInPage -= numValues; + rowId += numValues; + currentCount -= numValues; + state.setRowsToReadInBatch(state.getRowsToReadInBatch() - numValues); + rowsWithSkipsInThisBatch += numValues; + } } + state.setValuesToReadInPage(leftInPage); + state.setCurrentRowIndex(rowId); + state.setRowsWithSkipsInThisBatch(rowsWithSkipsInThisBatch); } public void nextBatch( final FieldVector vector, - final int startOffset, final int typeWidth, - final int numValsToRead, NullabilityHolder nullabilityHolder, - VectorizedValuesReader valuesReader) { + VectorizedValuesReader valuesReader, + ParquetReadState readState) { nextBatch( vector, - startOffset, typeWidth, - numValsToRead, (mode, idx, numValues, byteArray, validityBuffer) -> { switch (mode) { case RLE: @@ -99,22 +136,21 @@ public void nextBatch( nextPackedBatch( vector, typeWidth, nullabilityHolder, valuesReader, idx, numValues, byteArray); } - }); + }, + (mode, n) -> skipValues(mode, (ValuesReader) valuesReader, n, typeWidth), + readState); } public void nextDictEncodedBatch( final FieldVector vector, - final int startOffset, final int typeWidth, - final int numValsToRead, NullabilityHolder nullabilityHolder, VectorizedDictionaryEncodedParquetValuesReader valuesReader, - Dictionary dict) { + Dictionary dict, + ParquetReadState readState) { nextBatch( vector, - startOffset, typeWidth, - numValsToRead, (mode, idx, numValues, byteArray, validityBuffer) -> { switch (mode) { case RLE: @@ -139,7 +175,9 @@ public void nextDictEncodedBatch( numValues, validityBuffer); } - }); + }, + (mode, n) -> skipValues(mode, valuesReader, n, typeWidth), + readState); } protected abstract void nextRleBatch( @@ -211,6 +249,51 @@ protected abstract void nextDictEncodedVal( int numValues, NullabilityHolder holder, int typeWidth); + + /** + * Skip the next `n` values (either null or non-null) from this definition level reader and + * `valueReader`. + */ + public void skipValues(Mode mode, ValuesReader valuesReader, int total, int typeWidth) { + int left = total; + while (left > 0) { + if (currentCount == 0) { + readNextGroup(); + } + int num = Math.min(left, currentCount); + switch (mode) { + case RLE: + // we only need to skip non-null values from `valuesReader` since nulls are represented + // via definition levels which are skipped here via decrementing `currentCount`. + if (currentValue == maxDefLevel) { + if (valuesReader instanceof VectorizedValuesReader) { + skipValues((VectorizedValuesReader) valuesReader, num, typeWidth); + } else if (valuesReader instanceof VectorizedDictionaryEncodedParquetValuesReader) { + ((VectorizedDictionaryEncodedParquetValuesReader) valuesReader).skipValues(num); + } + } + break; + case PACKED: + int totalSkipNum = 0; + for (int i = 0; i < num; ++i) { + // same as above, only skip non-null values from `valuesReader` + if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) { + ++totalSkipNum; + } + } + if (valuesReader instanceof VectorizedValuesReader) { + skipValues((VectorizedValuesReader) valuesReader, totalSkipNum, typeWidth); + } else if (valuesReader instanceof VectorizedDictionaryEncodedParquetValuesReader) { + ((VectorizedDictionaryEncodedParquetValuesReader) valuesReader).skipValues(num); + } + break; + } + currentCount -= num; + left -= num; + } + } + + public abstract void skipValues(VectorizedValuesReader valuesReader, int total, int typeWidth); } abstract class NumericBaseReader extends CommonReader { @@ -308,6 +391,11 @@ public void nextVals( FieldVector vector, int rowId, VectorizedValuesReader valuesReader, int total) { valuesReader.readLongs(total, vector, rowId); } + + @Override + public void skipValues(VectorizedValuesReader valuesReader, int total, int typeWidth) { + valuesReader.skipLongs(total); + } } class DoubleReader extends NumericBaseReader { @@ -341,6 +429,11 @@ public void nextVals( FieldVector vector, int rowId, VectorizedValuesReader valuesReader, int total) { valuesReader.readDoubles(total, vector, rowId); } + + @Override + public void skipValues(VectorizedValuesReader valuesReader, int total, int typeWidth) { + valuesReader.skipDoubles(total); + } } class FloatReader extends NumericBaseReader { @@ -374,6 +467,11 @@ public void nextVals( FieldVector vector, int rowId, VectorizedValuesReader valuesReader, int total) { valuesReader.readFloats(total, vector, rowId); } + + @Override + public void skipValues(VectorizedValuesReader valuesReader, int total, int typeWidth) { + valuesReader.skipFloats(total); + } } class IntegerReader extends NumericBaseReader { @@ -409,6 +507,11 @@ public void nextVals( FieldVector vector, int rowId, VectorizedValuesReader valuesReader, int total) { valuesReader.readIntegers(total, vector, rowId); } + + @Override + public void skipValues(VectorizedValuesReader valuesReader, int total, int typeWidth) { + valuesReader.skipIntegers(total); + } } abstract class BaseReader extends CommonReader { @@ -481,6 +584,11 @@ protected void nextVal( vector.getDataBuffer().setLong((long) idx * typeWidth, valuesReader.readLong() * 1000); } + @Override + public void skipValues(VectorizedValuesReader valuesReader, int total, int typeWidth) { + valuesReader.skipLongs(total); + } + @Override protected void nextDictEncodedVal( FieldVector vector, @@ -517,6 +625,11 @@ protected void nextVal( vector.getDataBuffer().setLong((long) idx * typeWidth, timestampInt96); } + @Override + public void skipValues(VectorizedValuesReader valuesReader, int total, int typeWidth) { + valuesReader.skipFixedSizeBinary(total, 12); + } + @Override protected void nextDictEncodedVal( FieldVector vector, @@ -560,6 +673,11 @@ protected void nextVal( ((FixedSizeBinaryVector) vector).set(idx, byteArray); } + @Override + public void skipValues(VectorizedValuesReader valuesReader, int total, int typeWidth) { + valuesReader.skipFixedSizeBinary(total, typeWidth); + } + @Override protected void nextDictEncodedVal( FieldVector vector, @@ -607,6 +725,11 @@ protected void nextVal( } } + @Override + public void skipValues(VectorizedValuesReader valuesReader, int total, int typeWidth) { + valuesReader.skipBinary(total); + } + @Override protected void nextDictEncodedVal( FieldVector vector, @@ -639,6 +762,11 @@ protected void nextVal( ((BitVector) vector).setSafe(idx, valuesReader.readBoolean() ? 1 : 0); } + @Override + public void skipValues(VectorizedValuesReader valuesReader, int total, int typeWidth) { + valuesReader.skipBooleans(total); + } + @Override protected void nextDictEncodedVal( FieldVector vector, @@ -665,6 +793,11 @@ protected void nextVal( throw new UnsupportedOperationException(); } + @Override + public void skipValues(VectorizedValuesReader valuesReader, int total, int typeWidth) { + throw new UnsupportedOperationException(); + } + @Override protected void nextDictEncodedVal( FieldVector vector, diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedPlainValuesReader.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedPlainValuesReader.java index 764b2fc353e3..840f1793b6c4 100644 --- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedPlainValuesReader.java +++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedPlainValuesReader.java @@ -18,6 +18,7 @@ */ package org.apache.iceberg.arrow.vectorized.parquet; +import java.io.IOException; import java.nio.ByteBuffer; import org.apache.arrow.vector.FieldVector; import org.apache.iceberg.parquet.ValuesAsBytesReader; @@ -50,6 +51,27 @@ public Binary readBinary(int len) { } } + @Override + public void skipBinary(int total) { + try { + for (int i = 0; i < total; i++) { + int len = readInteger(); + getValuesInputStream().skipFully(len); + } + } catch (IOException e) { + throw new RuntimeException("Failed to skip " + total + " bytes", e); + } + } + + @Override + public void skipFixedSizeBinary(int total, int len) { + try { + getValuesInputStream().skipFully(total * (long) len); + } catch (IOException e) { + throw new RuntimeException("Failed to skip " + total + " bytes", e); + } + } + private void readValues(int total, FieldVector vec, int rowId, int typeWidth) { ByteBuffer buffer = getBuffer(total * typeWidth); vec.getDataBuffer().setBytes((long) rowId * typeWidth, buffer); @@ -60,18 +82,61 @@ public void readIntegers(int total, FieldVector vec, int rowId) { readValues(total, vec, rowId, INT_SIZE); } + @Override + public void skipIntegers(int total) { + try { + getValuesInputStream().skipFully((long) total * INT_SIZE); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + @Override public void readLongs(int total, FieldVector vec, int rowId) { readValues(total, vec, rowId, LONG_SIZE); } + @Override + public void skipLongs(int total) { + try { + getValuesInputStream().skipFully((long) total * LONG_SIZE); + } catch (IOException e) { + throw new RuntimeException("Failed to skip " + total + " bytes", e); + } + } + @Override public void readFloats(int total, FieldVector vec, int rowId) { readValues(total, vec, rowId, FLOAT_SIZE); } + @Override + public void skipFloats(int total) { + try { + getValuesInputStream().skipFully((long) total * FLOAT_SIZE); + } catch (IOException e) { + throw new RuntimeException("Failed to skip " + total + " bytes", e); + } + } + @Override public void readDoubles(int total, FieldVector vec, int rowId) { readValues(total, vec, rowId, DOUBLE_SIZE); } + + @Override + public void skipDoubles(int total) { + try { + getValuesInputStream().skipFully((long) total * DOUBLE_SIZE); + } catch (IOException e) { + throw new RuntimeException("Failed to skip " + total + " bytes", e); + } + } + + @Override + public void skipBooleans(int total) { + for (int i = 0; i < total; i++) { + readBooleanAsInt(); + } + } } diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedValuesReader.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedValuesReader.java index 7c23149b18ab..b853d7cbedba 100644 --- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedValuesReader.java +++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedValuesReader.java @@ -76,6 +76,41 @@ interface VectorizedValuesReader { /** Read `total` doubles into `vec` starting at `vec[rowId]` */ void readDoubles(int total, FieldVector vec, int rowId); + /* + * Skips `total` values + */ + default void skipBooleans(int total) { + throw new UnsupportedOperationException(); + } + + default void skipIntegers(int total) { + throw new UnsupportedOperationException(); + } + + default void skipLongs(int total) { + throw new UnsupportedOperationException(); + } + + default void skipFloats(int total) { + throw new UnsupportedOperationException(); + } + + default void skipDoubles(int total) { + throw new UnsupportedOperationException(); + } + + default void skipBinary(int total) { + throw new UnsupportedOperationException(); + } + + default void skipFixedSizeBinary(int total, int len) { + throw new UnsupportedOperationException(); + } + + default void skipBytes(int total) { + throw new UnsupportedOperationException(); + } + /** * Initialize the reader from a page. See {@link ValuesReader#initFromPage(int, * ByteBufferInputStream)}. diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java index 2b2e460ee994..a040f44a0a45 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java @@ -112,6 +112,7 @@ import org.apache.parquet.conf.PlainParquetConfiguration; import org.apache.parquet.crypto.FileDecryptionProperties; import org.apache.parquet.crypto.FileEncryptionProperties; +import org.apache.parquet.filter2.compat.FilterCompat; import org.apache.parquet.hadoop.ParquetFileReader; import org.apache.parquet.hadoop.ParquetFileWriter; import org.apache.parquet.hadoop.ParquetOutputFormat; @@ -1430,7 +1431,6 @@ public CloseableIterable build() { } optionsBuilder.withUseHadoopVectoredIo(true); - ParquetReadOptions options = optionsBuilder.build(); NameMapping mapping; if (nameMapping != null) { @@ -1442,10 +1442,11 @@ public CloseableIterable build() { } if (batchedReaderFunc != null) { + buildRecordFilter(optionsBuilder, fileDecryptionProperties); return new VectorizedParquetReader<>( file, schema, - options, + optionsBuilder.build(), batchedReaderFunc, mapping, filter, @@ -1461,7 +1462,14 @@ public CloseableIterable build() { .apply(); return new org.apache.iceberg.parquet.ParquetReader<>( - file, schema, options, readBuilder, mapping, filter, reuseContainers, caseSensitive); + file, + schema, + optionsBuilder.build(), + readBuilder, + mapping, + filter, + reuseContainers, + caseSensitive); } } @@ -1490,24 +1498,14 @@ public CloseableIterable build() { if (filter != null) { // TODO: should not need to get the schema to push down before opening the file. // Parquet should allow setting a filter inside its read support - ParquetReadOptions decryptOptions = - ParquetReadOptions.builder(new PlainParquetConfiguration()) - .withDecryption(fileDecryptionProperties) - .build(); - MessageType type; - try (ParquetFileReader schemaReader = - ParquetFileReader.open(ParquetIO.file(file), decryptOptions)) { - type = schemaReader.getFileMetaData().getSchema(); - } catch (IOException e) { - throw new RuntimeIOException(e); - } + MessageType type = getSchemaFromFile(fileDecryptionProperties); Schema fileSchema = ParquetSchemaUtil.convert(type); builder .useStatsFilter() .useDictionaryFilter() .useRecordFilter(filterRecords) .useBloomFilter() - .withFilter(ParquetFilters.convert(fileSchema, filter, caseSensitive)); + .withFilter(ParquetFilters.convert(fileSchema, filter, type, caseSensitive)); } else { // turn off filtering builder @@ -1535,6 +1533,36 @@ public CloseableIterable build() { return new ParquetIterable<>(builder); } + + private void buildRecordFilter( + ParquetReadOptions.Builder optionsBuilder, + FileDecryptionProperties fileDecryptionProperties) { + if (filter != null) { + MessageType type = getSchemaFromFile(fileDecryptionProperties); + Schema fileSchema = ParquetSchemaUtil.convert(type); + try { + FilterCompat.Filter parquetFilters = + ParquetFilters.convert(fileSchema, filter, type, caseSensitive); + optionsBuilder.useRecordFilter(); + optionsBuilder.withRecordFilter(parquetFilters); + } catch (RuntimeException e) { + LOG.warn("Cannot convert {} to parquet filter, skipping record filter", filter, e); + } + } + } + + private MessageType getSchemaFromFile(FileDecryptionProperties fileDecryptionProperties) { + ParquetReadOptions decryptOptions = + ParquetReadOptions.builder(new PlainParquetConfiguration()) + .withDecryption(fileDecryptionProperties) + .build(); + try (ParquetFileReader reader = + ParquetFileReader.open(ParquetIO.file(file), decryptOptions)) { + return reader.getFileMetaData().getSchema(); + } catch (IOException e) { + throw new RuntimeIOException(e); + } + } } private static class ParquetReadBuilder extends ParquetReader.Builder { diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetFilters.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetFilters.java index fc6febe19438..18dc94027f6c 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetFilters.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetFilters.java @@ -33,15 +33,20 @@ import org.apache.parquet.filter2.predicate.FilterApi; import org.apache.parquet.filter2.predicate.FilterPredicate; import org.apache.parquet.filter2.predicate.Operators; +import org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator; import org.apache.parquet.io.api.Binary; +import org.apache.parquet.schema.MessageType; class ParquetFilters { private ParquetFilters() {} - static FilterCompat.Filter convert(Schema schema, Expression expr, boolean caseSensitive) { + static FilterCompat.Filter convert( + Schema schema, Expression expr, MessageType type, boolean caseSensitive) { FilterPredicate pred = ExpressionVisitors.visit(expr, new ConvertFilterToParquet(schema, caseSensitive)); + // Reading int96 timestamps in imported data is incompatible + SchemaCompatibilityValidator.validate(pred, type); // TODO: handle AlwaysFalse.INSTANCE if (pred != null && pred != AlwaysTrue.INSTANCE) { // FilterCompat will apply LogicalInverseRewriter diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java b/parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java index 1fb2372ba568..7b75d139b9bb 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java @@ -36,6 +36,8 @@ import org.apache.parquet.hadoop.metadata.BlockMetaData; import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; import org.apache.parquet.hadoop.metadata.ColumnPath; +import org.apache.parquet.internal.filter2.columnindex.ColumnIndexFilter; +import org.apache.parquet.internal.filter2.columnindex.ColumnIndexStore; import org.apache.parquet.schema.MessageType; /** @@ -112,10 +114,13 @@ class ReadConf { typeWithIds, rowGroup, reader.getBloomFilterDataReader(rowGroup))); this.shouldSkip[i] = !shouldRead; if (shouldRead) { - computedTotalValues += rowGroup.getRowCount(); + if (batchedReaderFunc != null && options.useRecordFilter()) { + computedTotalValues += getFilteredRecordCount(rowGroup, reader.getColumnIndexStore(i)); + } else { + computedTotalValues += rowGroup.getRowCount(); + } } } - this.totalValues = computedTotalValues; if (readerFunc != null) { this.model = (ParquetValueReader) readerFunc.apply(typeWithIds); @@ -220,4 +225,15 @@ private List> getColumnChunkMetadataForRowG } return listBuilder.build(); } + + private long getFilteredRecordCount(BlockMetaData blockMetaData, ColumnIndexStore ciStore) { + return ColumnIndexFilter.calculateRowRanges( + options.getRecordFilter(), + ciStore, + blockMetaData.getColumns().stream() + .map(ColumnChunkMetaData::getPath) + .collect(Collectors.toSet()), + blockMetaData.getRowCount()) + .rowCount(); + } } diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ValuesAsBytesReader.java b/parquet/src/main/java/org/apache/iceberg/parquet/ValuesAsBytesReader.java index 71e10247af37..4c3dd62c1903 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ValuesAsBytesReader.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ValuesAsBytesReader.java @@ -102,11 +102,15 @@ public final int readBooleanAsInt() { return value; } - private byte getByte() { + public byte getByte() { try { return (byte) valuesInputStream.read(); } catch (IOException e) { throw new ParquetDecodingException("Failed to read a byte", e); } } + + public ByteBufferInputStream getValuesInputStream() { + return valuesInputStream; + } } diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/VectorizedParquetReader.java b/parquet/src/main/java/org/apache/iceberg/parquet/VectorizedParquetReader.java index fc10a57ec0e0..4fff716323d4 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/VectorizedParquetReader.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/VectorizedParquetReader.java @@ -158,7 +158,7 @@ private void advance() { } PageReadStore pages; try { - pages = reader.readNextRowGroup(); + pages = reader.readFilteredRowGroup(nextRowGroup); } catch (IOException e) { throw new RuntimeIOException(e); } diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/GenericsHelpers.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/GenericsHelpers.java index 7d6e51a0dfab..149a47b8b848 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/GenericsHelpers.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/GenericsHelpers.java @@ -35,6 +35,7 @@ import java.time.temporal.ChronoUnit; import java.util.Collection; import java.util.Date; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.UUID; @@ -49,6 +50,7 @@ import org.apache.spark.sql.catalyst.util.ArrayData; import org.apache.spark.sql.catalyst.util.MapData; import org.apache.spark.sql.types.Decimal; +import org.apache.spark.sql.vectorized.ColumnarBatch; import org.apache.spark.unsafe.types.UTF8String; import scala.collection.Seq; @@ -58,6 +60,19 @@ private GenericsHelpers() {} private static final OffsetDateTime EPOCH = Instant.ofEpochMilli(0L).atOffset(ZoneOffset.UTC); private static final LocalDate EPOCH_DAY = EPOCH.toLocalDate(); + public static void assertEqualsBatch( + Types.StructType struct, + Iterator expectedRecords, + ColumnarBatch batch, + Map idToConstant, + Integer batchFirstRowPos) { + for (int rowPos = 0; rowPos < batch.numRows(); rowPos++) { + InternalRow row = batch.getRow(rowPos); + Record expectedRecord = expectedRecords.next(); + assertEqualsUnsafe(struct, expectedRecord, row, idToConstant, batchFirstRowPos + rowPos); + } + } + public static void assertEqualsSafe(Types.StructType struct, Record expected, Row actual) { List fields = struct.fields(); for (int i = 0; i < fields.size(); i += 1) { diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetPageSkippingVectorizedReads.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetPageSkippingVectorizedReads.java new file mode 100644 index 000000000000..76b9781d37d9 --- /dev/null +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetPageSkippingVectorizedReads.java @@ -0,0 +1,276 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.data.parquet.vectorized; + +import static org.apache.iceberg.TableProperties.PARQUET_PAGE_ROW_LIMIT; +import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.iceberg.Files; +import org.apache.iceberg.MetadataColumns; +import org.apache.iceberg.Schema; +import org.apache.iceberg.data.DeleteFilter; +import org.apache.iceberg.data.RandomGenericData; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.data.parquet.GenericParquetWriter; +import org.apache.iceberg.deletes.PositionDeleteIndex; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.data.AvroDataTestBase; +import org.apache.iceberg.spark.data.GenericsHelpers; +import org.apache.iceberg.spark.data.vectorized.VectorizedSparkParquetReaders; +import org.apache.iceberg.spark.source.BatchReaderUtil; +import org.apache.iceberg.util.Pair; +import org.apache.spark.sql.vectorized.ColumnarBatch; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +public class TestParquetPageSkippingVectorizedReads extends AvroDataTestBase { + + private static final Schema DATA_SCHEMA = new Schema(SUPPORTED_PRIMITIVES.fields()); + + @TempDir protected static Path temp; + + private static List allRecords = Lists.newArrayList(); + + /* Column and offset indexes info of `l` column in `testFile` copied from text printed by parquet-cli's + column-index command: + row-group 0: + column index for column l: + null count min max + page-0 0 0 99 + page-1 0 100 199 + page-2 0 200 299 + page-3 0 300 399 + page-4 0 400 499 + page-5 0 500 599 + page-6 0 600 618 + offset index for column l: + offset compressed size first row index + page-0 21370 213 0 + page-1 21583 213 100 + page-2 21796 213 200 + page-3 22009 216 300 + page-4 22225 217 400 + page-5 22442 216 500 + page-6 22658 87 600 + row-group 1: + column index for column l: + null count min max + page-0 0 619 718 + page-1 0 719 818 + page-2 0 819 918 + page-3 0 919 999 + offset index for column l: + offset compressed size first row index + page-0 117388 217 0 + page-1 117605 216 100 + page-2 117821 217 200 + page-3 118038 188 300 + */ + private static long index = -1; + private static File testFile; + + @BeforeAll + public static void generateFile() throws IOException { + Function transform = + record -> { + index += 1; + record.setField("l", index); + return record; + }; + int numRecords = 1000; + allRecords = + RandomGenericData.generate(DATA_SCHEMA, numRecords, 0L).stream() + .map(transform) + .collect(Collectors.toList()); + testFile = File.createTempFile("junit", null, temp.toFile()); + assertThat(testFile.delete()).as("Delete should succeed").isTrue(); + try (FileAppender fileAppender = + Parquet.write(Files.localOutput(testFile)) + .schema(DATA_SCHEMA) + .createWriterFunc(GenericParquetWriter::create) + .set(PARQUET_PAGE_ROW_LIMIT, "100") + .set(PARQUET_ROW_GROUP_SIZE_BYTES, "80000") // 2 row groups + .named("page-skipping-test") + .build()) { + fileAppender.addAll(allRecords.iterator()); + } + } + + @Test + public void testSinglePageMatch() { + Expression filter = + Expressions.and( + Expressions.greaterThanOrEqual("l", 150), + Expressions.lessThan("l", 200)); // page-1 in row group 0 -> row ranges: [100, 199] + + List expected = selectRecords(allRecords, Pair.of(100, 200)); + readAndValidate(filter, expected); + } + + @Test + public void testMultiplePagesMatch() { + Expression filter = + Expressions.or( + // page-1 in row group 0 -> row ranges: [100, 199] + Expressions.and( + Expressions.greaterThanOrEqual("l", 150), Expressions.lessThan("l", 200)), + // page-3, page-4 in row group 0 -> row ranges[300, 499] + Expressions.and( + Expressions.greaterThanOrEqual("l", 350), Expressions.lessThan("l", 450))); + + List expected = selectRecords(allRecords, Pair.of(100, 200), Pair.of(300, 500)); + readAndValidate(filter, expected); + } + + @Test + public void testMultipleRowGroupsMatch() { + Expression filter = + Expressions.or( + // page-1 in row group 0 -> row ranges: [100, 199] + Expressions.and( + Expressions.greaterThanOrEqual("l", 150), Expressions.lessThan("l", 200)), + // page-3, page-4 in row group 0 -> row ranges[300, 499] + Expressions.and( + Expressions.greaterThanOrEqual("l", 350), Expressions.lessThan("l", 450))); + filter = + Expressions.or( + filter, + // firstRowIndex in row group 1 -> 619 + // page-1, page-2 in row group 1 -> row ranges: [100, 299] + Expressions.and( + Expressions.greaterThanOrEqual("l", 750), Expressions.lessThan("l", 850))); + + List expected = + selectRecords(allRecords, Pair.of(100, 200), Pair.of(300, 500), Pair.of(719, 919)); + readAndValidate(filter, expected); + } + + @Test + public void testNoRowsMatch() { + Expression filter = Expressions.greaterThan("l", 1000); + readAndValidate(filter, ImmutableList.of()); + } + + @Test + public void testAllRowsMatch() { + Expression filter = Expressions.greaterThanOrEqual("l", Long.MIN_VALUE); + readAndValidate(filter, allRecords); + } + + @Test + public void testPageSkippingWithDelete() { + Schema projectSchema = + new Schema( + Stream.concat(DATA_SCHEMA.columns().stream(), Stream.of(MetadataColumns.ROW_POSITION)) + .collect(Collectors.toList())); + + DeleteFilter deleteFilter = mock(DeleteFilter.class); + when(deleteFilter.requiredSchema()).thenReturn(projectSchema); + when(deleteFilter.hasPosDeletes()).thenReturn(true); + when(deleteFilter.eqDeletedRowFilter()).thenReturn(null); + PositionDeleteIndex deletedRowPos = mock(PositionDeleteIndex.class); + + // pos 180, 800 are deleted in file + when(deletedRowPos.isDeleted(180L)).thenReturn(true); + when(deletedRowPos.isDeleted(800L)).thenReturn(true); + when(deleteFilter.deletedRowPositions()).thenReturn(deletedRowPos); + + Expression filter = + Expressions.or( + // page-1 in row group 0 -> row ranges: [100, 199] + Expressions.and( + Expressions.greaterThanOrEqual("l", 150), Expressions.lessThan("l", 200)), + // firstRowIndex in row group 1 -> 619 + // page-1, page-2 in row group 1 -> row ranges: [100, 299] + Expressions.and( + Expressions.greaterThanOrEqual("l", 750), Expressions.lessThan("l", 850))); + + List expected = + selectRecords( + allRecords, Pair.of(100, 180), Pair.of(181, 200), Pair.of(719, 800), Pair.of(801, 919)); + + readAndValidate(filter, expected, deleteFilter); + } + + private void readAndValidate(Expression filter, List expected) { + Parquet.ReadBuilder builder = + Parquet.read(Files.localInput(testFile)).project(DATA_SCHEMA).filter(filter); + CloseableIterable batches = + builder + .createBatchedReaderFunc( + type -> + VectorizedSparkParquetReaders.buildReader(DATA_SCHEMA, type, ImmutableMap.of())) + .build(); + Iterator expectedIter = expected.iterator(); + for (ColumnarBatch batch : batches) { + GenericsHelpers.assertEqualsBatch( + SUPPORTED_PRIMITIVES, expectedIter, batch, null, batch.numRows()); + } + } + + private void readAndValidate( + Expression filter, List expected, DeleteFilter deleteFilter) { + Parquet.ReadBuilder builder = + Parquet.read(Files.localInput(testFile)).project(DATA_SCHEMA).filter(filter); + builder = + builder.createBatchedReaderFunc( + type -> + VectorizedSparkParquetReaders.buildReader( + deleteFilter.requiredSchema(), type, ImmutableMap.of())); + CloseableIterable batches = + BatchReaderUtil.applyDeleteFilter(builder.build(), deleteFilter); + + Iterator expectedIter = expected.iterator(); + for (ColumnarBatch batch : batches) { + GenericsHelpers.assertEqualsBatch( + SUPPORTED_PRIMITIVES, expectedIter, batch, null, batch.numRows()); + } + } + + private List selectRecords(List records, Pair... ranges) { + return Arrays.stream(ranges) + .map(range -> records.subList(range.first(), range.second())) + .flatMap(Collection::stream) + .collect(Collectors.toList()); + } + + @Override + protected void writeAndValidate(Schema schema) throws IOException {} +} diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetPageSkippingVectorizedReads.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetPageSkippingVectorizedReads.java new file mode 100644 index 000000000000..0163426214ba --- /dev/null +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetPageSkippingVectorizedReads.java @@ -0,0 +1,276 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.data.vectorized.parquet; + +import static org.apache.iceberg.TableProperties.PARQUET_PAGE_ROW_LIMIT; +import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.iceberg.Files; +import org.apache.iceberg.MetadataColumns; +import org.apache.iceberg.Schema; +import org.apache.iceberg.data.DeleteFilter; +import org.apache.iceberg.data.RandomGenericData; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.data.parquet.GenericParquetWriter; +import org.apache.iceberg.deletes.PositionDeleteIndex; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.data.AvroDataTestBase; +import org.apache.iceberg.spark.data.GenericsHelpers; +import org.apache.iceberg.spark.data.vectorized.VectorizedSparkParquetReaders; +import org.apache.iceberg.spark.source.BatchReaderUtil; +import org.apache.iceberg.util.Pair; +import org.apache.spark.sql.vectorized.ColumnarBatch; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +public class TestParquetPageSkippingVectorizedReads extends AvroDataTestBase { + + private static final Schema DATA_SCHEMA = new Schema(SUPPORTED_PRIMITIVES.fields()); + + @TempDir protected static Path temp; + + private static List allRecords = Lists.newArrayList(); + + /* Column and offset indexes info of `l` column in `testFile` copied from text printed by parquet-cli's + column-index command: + row-group 0: + column index for column l: + null count min max + page-0 0 0 99 + page-1 0 100 199 + page-2 0 200 299 + page-3 0 300 399 + page-4 0 400 499 + page-5 0 500 599 + page-6 0 600 618 + offset index for column l: + offset compressed size first row index + page-0 21370 213 0 + page-1 21583 213 100 + page-2 21796 213 200 + page-3 22009 216 300 + page-4 22225 217 400 + page-5 22442 216 500 + page-6 22658 87 600 + row-group 1: + column index for column l: + null count min max + page-0 0 619 718 + page-1 0 719 818 + page-2 0 819 918 + page-3 0 919 999 + offset index for column l: + offset compressed size first row index + page-0 117388 217 0 + page-1 117605 216 100 + page-2 117821 217 200 + page-3 118038 188 300 + */ + private static long index = -1; + private static File testFile; + + @BeforeAll + public static void generateFile() throws IOException { + Function transform = + record -> { + index += 1; + record.setField("l", index); + return record; + }; + int numRecords = 1000; + allRecords = + RandomGenericData.generate(DATA_SCHEMA, numRecords, 0L).stream() + .map(transform) + .collect(Collectors.toList()); + testFile = File.createTempFile("junit", null, temp.toFile()); + assertThat(testFile.delete()).as("Delete should succeed").isTrue(); + try (FileAppender fileAppender = + Parquet.write(Files.localOutput(testFile)) + .schema(DATA_SCHEMA) + .createWriterFunc(GenericParquetWriter::create) + .set(PARQUET_PAGE_ROW_LIMIT, "100") + .set(PARQUET_ROW_GROUP_SIZE_BYTES, "80000") // 2 row groups + .named("page-skipping-test") + .build()) { + fileAppender.addAll(allRecords.iterator()); + } + } + + @Test + public void testSinglePageMatch() { + Expression filter = + Expressions.and( + Expressions.greaterThanOrEqual("l", 150), + Expressions.lessThan("l", 200)); // page-1 in row group 0 -> row ranges: [100, 199] + + List expected = selectRecords(allRecords, Pair.of(100, 200)); + readAndValidate(filter, expected); + } + + @Test + public void testMultiplePagesMatch() { + Expression filter = + Expressions.or( + // page-1 in row group 0 -> row ranges: [100, 199] + Expressions.and( + Expressions.greaterThanOrEqual("l", 150), Expressions.lessThan("l", 200)), + // page-3, page-4 in row group 0 -> row ranges[300, 499] + Expressions.and( + Expressions.greaterThanOrEqual("l", 350), Expressions.lessThan("l", 450))); + + List expected = selectRecords(allRecords, Pair.of(100, 200), Pair.of(300, 500)); + readAndValidate(filter, expected); + } + + @Test + public void testMultipleRowGroupsMatch() { + Expression filter = + Expressions.or( + // page-1 in row group 0 -> row ranges: [100, 199] + Expressions.and( + Expressions.greaterThanOrEqual("l", 150), Expressions.lessThan("l", 200)), + // page-3, page-4 in row group 0 -> row ranges[300, 499] + Expressions.and( + Expressions.greaterThanOrEqual("l", 350), Expressions.lessThan("l", 450))); + filter = + Expressions.or( + filter, + // firstRowIndex in row group 1 -> 619 + // page-1, page-2 in row group 1 -> row ranges: [100, 299] + Expressions.and( + Expressions.greaterThanOrEqual("l", 750), Expressions.lessThan("l", 850))); + + List expected = + selectRecords(allRecords, Pair.of(100, 200), Pair.of(300, 500), Pair.of(719, 919)); + readAndValidate(filter, expected); + } + + @Test + public void testNoRowsMatch() { + Expression filter = Expressions.greaterThan("l", 1000); + readAndValidate(filter, ImmutableList.of()); + } + + @Test + public void testAllRowsMatch() { + Expression filter = Expressions.greaterThanOrEqual("l", Long.MIN_VALUE); + readAndValidate(filter, allRecords); + } + + @Test + public void testPageSkippingWithDelete() { + Schema projectSchema = + new Schema( + Stream.concat(DATA_SCHEMA.columns().stream(), Stream.of(MetadataColumns.ROW_POSITION)) + .collect(Collectors.toList())); + + DeleteFilter deleteFilter = mock(DeleteFilter.class); + when(deleteFilter.requiredSchema()).thenReturn(projectSchema); + when(deleteFilter.hasPosDeletes()).thenReturn(true); + when(deleteFilter.eqDeletedRowFilter()).thenReturn(null); + PositionDeleteIndex deletedRowPos = mock(PositionDeleteIndex.class); + + // pos 180, 800 are deleted in file + when(deletedRowPos.isDeleted(180L)).thenReturn(true); + when(deletedRowPos.isDeleted(800L)).thenReturn(true); + when(deleteFilter.deletedRowPositions()).thenReturn(deletedRowPos); + + Expression filter = + Expressions.or( + // page-1 in row group 0 -> row ranges: [100, 199] + Expressions.and( + Expressions.greaterThanOrEqual("l", 150), Expressions.lessThan("l", 200)), + // firstRowIndex in row group 1 -> 619 + // page-1, page-2 in row group 1 -> row ranges: [100, 299] + Expressions.and( + Expressions.greaterThanOrEqual("l", 750), Expressions.lessThan("l", 850))); + + List expected = + selectRecords( + allRecords, Pair.of(100, 180), Pair.of(181, 200), Pair.of(719, 800), Pair.of(801, 919)); + + readAndValidate(filter, expected, deleteFilter); + } + + private void readAndValidate(Expression filter, List expected) { + Parquet.ReadBuilder builder = + Parquet.read(Files.localInput(testFile)).project(DATA_SCHEMA).filter(filter); + CloseableIterable batches = + builder + .createBatchedReaderFunc( + type -> + VectorizedSparkParquetReaders.buildReader(DATA_SCHEMA, type, ImmutableMap.of())) + .build(); + Iterator expectedIter = expected.iterator(); + for (ColumnarBatch batch : batches) { + GenericsHelpers.assertEqualsBatch( + SUPPORTED_PRIMITIVES, expectedIter, batch, null, batch.numRows()); + } + } + + private void readAndValidate( + Expression filter, List expected, DeleteFilter deleteFilter) { + Parquet.ReadBuilder builder = + Parquet.read(Files.localInput(testFile)).project(DATA_SCHEMA).filter(filter); + builder = + builder.createBatchedReaderFunc( + type -> + VectorizedSparkParquetReaders.buildReader( + deleteFilter.requiredSchema(), type, ImmutableMap.of())); + CloseableIterable batches = + BatchReaderUtil.applyDeleteFilter(builder.build(), deleteFilter); + + Iterator expectedIter = expected.iterator(); + for (ColumnarBatch batch : batches) { + GenericsHelpers.assertEqualsBatch( + SUPPORTED_PRIMITIVES, expectedIter, batch, null, batch.numRows()); + } + } + + private List selectRecords(List records, Pair... ranges) { + return Arrays.stream(ranges) + .map(range -> records.subList(range.first(), range.second())) + .flatMap(Collection::stream) + .collect(Collectors.toList()); + } + + @Override + protected void writeAndValidate(Schema schema) throws IOException {} +} diff --git a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetPageSkippingVectorizedReads.java b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetPageSkippingVectorizedReads.java new file mode 100644 index 000000000000..4c9f2f399ffa --- /dev/null +++ b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetPageSkippingVectorizedReads.java @@ -0,0 +1,276 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.data.vectorized.parquet; + +import static org.apache.iceberg.TableProperties.PARQUET_PAGE_ROW_LIMIT; +import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.iceberg.Files; +import org.apache.iceberg.MetadataColumns; +import org.apache.iceberg.Schema; +import org.apache.iceberg.data.DeleteFilter; +import org.apache.iceberg.data.RandomGenericData; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.data.parquet.GenericParquetWriter; +import org.apache.iceberg.deletes.PositionDeleteIndex; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.data.AvroDataTestBase; +import org.apache.iceberg.spark.data.GenericsHelpers; +import org.apache.iceberg.spark.data.vectorized.VectorizedSparkParquetReaders; +import org.apache.iceberg.spark.source.BatchReaderUtil; +import org.apache.iceberg.util.Pair; +import org.apache.spark.sql.vectorized.ColumnarBatch; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +public class TestParquetPageSkippingVectorizedReads extends AvroDataTestBase { + + private static final Schema DATA_SCHEMA = new Schema(SUPPORTED_PRIMITIVES.fields()); + + @TempDir protected static Path temp; + + private static List allRecords = Lists.newArrayList(); + + /* Column and offset indexes info of `l` column in `testFile` copied from text printed by parquet-cli's + column-index command: + row-group 0: + column index for column l: + null count min max + page-0 0 0 99 + page-1 0 100 199 + page-2 0 200 299 + page-3 0 300 399 + page-4 0 400 499 + page-5 0 500 599 + page-6 0 600 612 + offset index for column l: + offset compressed size first row index + page-0 21230 213 0 + page-1 21443 213 100 + page-2 21656 213 200 + page-3 21869 216 300 + page-4 22085 217 400 + page-5 22302 216 500 + page-6 22518 77 600 + row-group 1: + column index for column l: + null count min max + page-0 0 613 712 + page-1 0 713 812 + page-2 0 813 912 + page-3 0 913 999 + offset index for column l: + offset compressed size first row index + page-0 117066 217 0 + page-1 117283 216 100 + page-2 117499 217 200 + page-3 117716 197 300 + */ + private static long index = -1; + private static File testFile; + + @BeforeAll + public static void generateFile() throws IOException { + Function transform = + record -> { + index += 1; + record.setField("l", index); + return record; + }; + int numRecords = 1000; + allRecords = + RandomGenericData.generate(DATA_SCHEMA, numRecords, 0L).stream() + .map(transform) + .collect(Collectors.toList()); + testFile = File.createTempFile("junit", null, temp.toFile()); + assertThat(testFile.delete()).as("Delete should succeed").isTrue(); + try (FileAppender fileAppender = + Parquet.write(Files.localOutput(testFile)) + .schema(DATA_SCHEMA) + .createWriterFunc(GenericParquetWriter::create) + .set(PARQUET_PAGE_ROW_LIMIT, "100") + .set(PARQUET_ROW_GROUP_SIZE_BYTES, "80000") // 2 row groups + .named("page-skipping-test") + .build()) { + fileAppender.addAll(allRecords.iterator()); + } + } + + @Test + public void testSinglePageMatch() { + Expression filter = + Expressions.and( + Expressions.greaterThanOrEqual("l", 150), + Expressions.lessThan("l", 200)); // page-1 in row group 0 -> row ranges: [100, 199] + + List expected = selectRecords(allRecords, Pair.of(100, 200)); + readAndValidate(filter, expected); + } + + @Test + public void testMultiplePagesMatch() { + Expression filter = + Expressions.or( + // page-1 in row group 0 -> row ranges: [100, 199] + Expressions.and( + Expressions.greaterThanOrEqual("l", 150), Expressions.lessThan("l", 200)), + // page-3, page-4 in row group 0 -> row ranges[300, 499] + Expressions.and( + Expressions.greaterThanOrEqual("l", 350), Expressions.lessThan("l", 450))); + + List expected = selectRecords(allRecords, Pair.of(100, 200), Pair.of(300, 500)); + readAndValidate(filter, expected); + } + + @Test + public void testMultipleRowGroupsMatch() { + Expression filter = + Expressions.or( + // page-1 in row group 0 -> row ranges: [100, 199] + Expressions.and( + Expressions.greaterThanOrEqual("l", 150), Expressions.lessThan("l", 200)), + // page-3, page-4 in row group 0 -> row ranges[300, 499] + Expressions.and( + Expressions.greaterThanOrEqual("l", 350), Expressions.lessThan("l", 450))); + filter = + Expressions.or( + filter, + // firstRowIndex in row group 1 -> 613 + // page-1, page-2 in row group 1 -> row ranges: [100, 299] + Expressions.and( + Expressions.greaterThanOrEqual("l", 750), Expressions.lessThan("l", 850))); + + List expected = + selectRecords(allRecords, Pair.of(100, 200), Pair.of(300, 500), Pair.of(713, 913)); + readAndValidate(filter, expected); + } + + @Test + public void testNoRowsMatch() { + Expression filter = Expressions.greaterThan("l", 1000); + readAndValidate(filter, ImmutableList.of()); + } + + @Test + public void testAllRowsMatch() { + Expression filter = Expressions.greaterThanOrEqual("l", Long.MIN_VALUE); + readAndValidate(filter, allRecords); + } + + @Test + public void testPageSkippingWithDelete() { + Schema projectSchema = + new Schema( + Stream.concat(DATA_SCHEMA.columns().stream(), Stream.of(MetadataColumns.ROW_POSITION)) + .collect(Collectors.toList())); + + DeleteFilter deleteFilter = mock(DeleteFilter.class); + when(deleteFilter.requiredSchema()).thenReturn(projectSchema); + when(deleteFilter.hasPosDeletes()).thenReturn(true); + when(deleteFilter.eqDeletedRowFilter()).thenReturn(null); + PositionDeleteIndex deletedRowPos = mock(PositionDeleteIndex.class); + + // pos 180, 800 are deleted in file + when(deletedRowPos.isDeleted(180L)).thenReturn(true); + when(deletedRowPos.isDeleted(800L)).thenReturn(true); + when(deleteFilter.deletedRowPositions()).thenReturn(deletedRowPos); + + Expression filter = + Expressions.or( + // page-1 in row group 0 -> row ranges: [100, 199] + Expressions.and( + Expressions.greaterThanOrEqual("l", 150), Expressions.lessThan("l", 200)), + // firstRowIndex in row group 1 -> 613 + // page-1, page-2 in row group 1 -> row ranges: [100, 299] + Expressions.and( + Expressions.greaterThanOrEqual("l", 750), Expressions.lessThan("l", 850))); + + List expected = + selectRecords( + allRecords, Pair.of(100, 180), Pair.of(181, 200), Pair.of(713, 800), Pair.of(801, 913)); + + readAndValidate(filter, expected, deleteFilter); + } + + private void readAndValidate(Expression filter, List expected) { + Parquet.ReadBuilder builder = + Parquet.read(Files.localInput(testFile)).project(DATA_SCHEMA).filter(filter); + CloseableIterable batches = + builder + .createBatchedReaderFunc( + type -> + VectorizedSparkParquetReaders.buildReader(DATA_SCHEMA, type, ImmutableMap.of())) + .build(); + Iterator expectedIter = expected.iterator(); + for (ColumnarBatch batch : batches) { + GenericsHelpers.assertEqualsBatch( + SUPPORTED_PRIMITIVES, expectedIter, batch, null, batch.numRows()); + } + } + + private void readAndValidate( + Expression filter, List expected, DeleteFilter deleteFilter) { + Parquet.ReadBuilder builder = + Parquet.read(Files.localInput(testFile)).project(DATA_SCHEMA).filter(filter); + builder = + builder.createBatchedReaderFunc( + type -> + VectorizedSparkParquetReaders.buildReader( + deleteFilter.requiredSchema(), type, ImmutableMap.of())); + CloseableIterable batches = + BatchReaderUtil.applyDeleteFilter(builder.build(), deleteFilter); + + Iterator expectedIter = expected.iterator(); + for (ColumnarBatch batch : batches) { + GenericsHelpers.assertEqualsBatch( + SUPPORTED_PRIMITIVES, expectedIter, batch, null, batch.numRows()); + } + } + + private List selectRecords(List records, Pair... ranges) { + return Arrays.stream(ranges) + .map(range -> records.subList(range.first(), range.second())) + .flatMap(Collection::stream) + .collect(Collectors.toList()); + } + + @Override + protected void writeAndValidate(Schema schema) throws IOException {} +} diff --git a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetPageSkippingVectorizedReads.java b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetPageSkippingVectorizedReads.java new file mode 100644 index 000000000000..4c9f2f399ffa --- /dev/null +++ b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetPageSkippingVectorizedReads.java @@ -0,0 +1,276 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.data.vectorized.parquet; + +import static org.apache.iceberg.TableProperties.PARQUET_PAGE_ROW_LIMIT; +import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.iceberg.Files; +import org.apache.iceberg.MetadataColumns; +import org.apache.iceberg.Schema; +import org.apache.iceberg.data.DeleteFilter; +import org.apache.iceberg.data.RandomGenericData; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.data.parquet.GenericParquetWriter; +import org.apache.iceberg.deletes.PositionDeleteIndex; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.data.AvroDataTestBase; +import org.apache.iceberg.spark.data.GenericsHelpers; +import org.apache.iceberg.spark.data.vectorized.VectorizedSparkParquetReaders; +import org.apache.iceberg.spark.source.BatchReaderUtil; +import org.apache.iceberg.util.Pair; +import org.apache.spark.sql.vectorized.ColumnarBatch; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +public class TestParquetPageSkippingVectorizedReads extends AvroDataTestBase { + + private static final Schema DATA_SCHEMA = new Schema(SUPPORTED_PRIMITIVES.fields()); + + @TempDir protected static Path temp; + + private static List allRecords = Lists.newArrayList(); + + /* Column and offset indexes info of `l` column in `testFile` copied from text printed by parquet-cli's + column-index command: + row-group 0: + column index for column l: + null count min max + page-0 0 0 99 + page-1 0 100 199 + page-2 0 200 299 + page-3 0 300 399 + page-4 0 400 499 + page-5 0 500 599 + page-6 0 600 612 + offset index for column l: + offset compressed size first row index + page-0 21230 213 0 + page-1 21443 213 100 + page-2 21656 213 200 + page-3 21869 216 300 + page-4 22085 217 400 + page-5 22302 216 500 + page-6 22518 77 600 + row-group 1: + column index for column l: + null count min max + page-0 0 613 712 + page-1 0 713 812 + page-2 0 813 912 + page-3 0 913 999 + offset index for column l: + offset compressed size first row index + page-0 117066 217 0 + page-1 117283 216 100 + page-2 117499 217 200 + page-3 117716 197 300 + */ + private static long index = -1; + private static File testFile; + + @BeforeAll + public static void generateFile() throws IOException { + Function transform = + record -> { + index += 1; + record.setField("l", index); + return record; + }; + int numRecords = 1000; + allRecords = + RandomGenericData.generate(DATA_SCHEMA, numRecords, 0L).stream() + .map(transform) + .collect(Collectors.toList()); + testFile = File.createTempFile("junit", null, temp.toFile()); + assertThat(testFile.delete()).as("Delete should succeed").isTrue(); + try (FileAppender fileAppender = + Parquet.write(Files.localOutput(testFile)) + .schema(DATA_SCHEMA) + .createWriterFunc(GenericParquetWriter::create) + .set(PARQUET_PAGE_ROW_LIMIT, "100") + .set(PARQUET_ROW_GROUP_SIZE_BYTES, "80000") // 2 row groups + .named("page-skipping-test") + .build()) { + fileAppender.addAll(allRecords.iterator()); + } + } + + @Test + public void testSinglePageMatch() { + Expression filter = + Expressions.and( + Expressions.greaterThanOrEqual("l", 150), + Expressions.lessThan("l", 200)); // page-1 in row group 0 -> row ranges: [100, 199] + + List expected = selectRecords(allRecords, Pair.of(100, 200)); + readAndValidate(filter, expected); + } + + @Test + public void testMultiplePagesMatch() { + Expression filter = + Expressions.or( + // page-1 in row group 0 -> row ranges: [100, 199] + Expressions.and( + Expressions.greaterThanOrEqual("l", 150), Expressions.lessThan("l", 200)), + // page-3, page-4 in row group 0 -> row ranges[300, 499] + Expressions.and( + Expressions.greaterThanOrEqual("l", 350), Expressions.lessThan("l", 450))); + + List expected = selectRecords(allRecords, Pair.of(100, 200), Pair.of(300, 500)); + readAndValidate(filter, expected); + } + + @Test + public void testMultipleRowGroupsMatch() { + Expression filter = + Expressions.or( + // page-1 in row group 0 -> row ranges: [100, 199] + Expressions.and( + Expressions.greaterThanOrEqual("l", 150), Expressions.lessThan("l", 200)), + // page-3, page-4 in row group 0 -> row ranges[300, 499] + Expressions.and( + Expressions.greaterThanOrEqual("l", 350), Expressions.lessThan("l", 450))); + filter = + Expressions.or( + filter, + // firstRowIndex in row group 1 -> 613 + // page-1, page-2 in row group 1 -> row ranges: [100, 299] + Expressions.and( + Expressions.greaterThanOrEqual("l", 750), Expressions.lessThan("l", 850))); + + List expected = + selectRecords(allRecords, Pair.of(100, 200), Pair.of(300, 500), Pair.of(713, 913)); + readAndValidate(filter, expected); + } + + @Test + public void testNoRowsMatch() { + Expression filter = Expressions.greaterThan("l", 1000); + readAndValidate(filter, ImmutableList.of()); + } + + @Test + public void testAllRowsMatch() { + Expression filter = Expressions.greaterThanOrEqual("l", Long.MIN_VALUE); + readAndValidate(filter, allRecords); + } + + @Test + public void testPageSkippingWithDelete() { + Schema projectSchema = + new Schema( + Stream.concat(DATA_SCHEMA.columns().stream(), Stream.of(MetadataColumns.ROW_POSITION)) + .collect(Collectors.toList())); + + DeleteFilter deleteFilter = mock(DeleteFilter.class); + when(deleteFilter.requiredSchema()).thenReturn(projectSchema); + when(deleteFilter.hasPosDeletes()).thenReturn(true); + when(deleteFilter.eqDeletedRowFilter()).thenReturn(null); + PositionDeleteIndex deletedRowPos = mock(PositionDeleteIndex.class); + + // pos 180, 800 are deleted in file + when(deletedRowPos.isDeleted(180L)).thenReturn(true); + when(deletedRowPos.isDeleted(800L)).thenReturn(true); + when(deleteFilter.deletedRowPositions()).thenReturn(deletedRowPos); + + Expression filter = + Expressions.or( + // page-1 in row group 0 -> row ranges: [100, 199] + Expressions.and( + Expressions.greaterThanOrEqual("l", 150), Expressions.lessThan("l", 200)), + // firstRowIndex in row group 1 -> 613 + // page-1, page-2 in row group 1 -> row ranges: [100, 299] + Expressions.and( + Expressions.greaterThanOrEqual("l", 750), Expressions.lessThan("l", 850))); + + List expected = + selectRecords( + allRecords, Pair.of(100, 180), Pair.of(181, 200), Pair.of(713, 800), Pair.of(801, 913)); + + readAndValidate(filter, expected, deleteFilter); + } + + private void readAndValidate(Expression filter, List expected) { + Parquet.ReadBuilder builder = + Parquet.read(Files.localInput(testFile)).project(DATA_SCHEMA).filter(filter); + CloseableIterable batches = + builder + .createBatchedReaderFunc( + type -> + VectorizedSparkParquetReaders.buildReader(DATA_SCHEMA, type, ImmutableMap.of())) + .build(); + Iterator expectedIter = expected.iterator(); + for (ColumnarBatch batch : batches) { + GenericsHelpers.assertEqualsBatch( + SUPPORTED_PRIMITIVES, expectedIter, batch, null, batch.numRows()); + } + } + + private void readAndValidate( + Expression filter, List expected, DeleteFilter deleteFilter) { + Parquet.ReadBuilder builder = + Parquet.read(Files.localInput(testFile)).project(DATA_SCHEMA).filter(filter); + builder = + builder.createBatchedReaderFunc( + type -> + VectorizedSparkParquetReaders.buildReader( + deleteFilter.requiredSchema(), type, ImmutableMap.of())); + CloseableIterable batches = + BatchReaderUtil.applyDeleteFilter(builder.build(), deleteFilter); + + Iterator expectedIter = expected.iterator(); + for (ColumnarBatch batch : batches) { + GenericsHelpers.assertEqualsBatch( + SUPPORTED_PRIMITIVES, expectedIter, batch, null, batch.numRows()); + } + } + + private List selectRecords(List records, Pair... ranges) { + return Arrays.stream(ranges) + .map(range -> records.subList(range.first(), range.second())) + .flatMap(Collection::stream) + .collect(Collectors.toList()); + } + + @Override + protected void writeAndValidate(Schema schema) throws IOException {} +} From 08a9767853a49908b4b5b04d26e62be0664be63c Mon Sep 17 00:00:00 2001 From: liu yang Date: Mon, 2 Feb 2026 09:35:45 +0800 Subject: [PATCH 2/5] using ParquetDecodingException instead of RuntimeException --- .../parquet/VectorizedPlainValuesReader.java | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedPlainValuesReader.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedPlainValuesReader.java index 840f1793b6c4..988110e170e0 100644 --- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedPlainValuesReader.java +++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedPlainValuesReader.java @@ -22,6 +22,7 @@ import java.nio.ByteBuffer; import org.apache.arrow.vector.FieldVector; import org.apache.iceberg.parquet.ValuesAsBytesReader; +import org.apache.parquet.io.ParquetDecodingException; import org.apache.parquet.io.api.Binary; class VectorizedPlainValuesReader extends ValuesAsBytesReader implements VectorizedValuesReader { @@ -59,7 +60,7 @@ public void skipBinary(int total) { getValuesInputStream().skipFully(len); } } catch (IOException e) { - throw new RuntimeException("Failed to skip " + total + " bytes", e); + throw new ParquetDecodingException("Failed to skip bytes", e); } } @@ -68,7 +69,7 @@ public void skipFixedSizeBinary(int total, int len) { try { getValuesInputStream().skipFully(total * (long) len); } catch (IOException e) { - throw new RuntimeException("Failed to skip " + total + " bytes", e); + throw new ParquetDecodingException("Failed to skip bytes", e); } } @@ -87,7 +88,7 @@ public void skipIntegers(int total) { try { getValuesInputStream().skipFully((long) total * INT_SIZE); } catch (IOException e) { - throw new RuntimeException(e); + throw new ParquetDecodingException("Failed to skip bytes", e); } } @@ -101,7 +102,7 @@ public void skipLongs(int total) { try { getValuesInputStream().skipFully((long) total * LONG_SIZE); } catch (IOException e) { - throw new RuntimeException("Failed to skip " + total + " bytes", e); + throw new ParquetDecodingException("Failed to skip bytes", e); } } @@ -115,7 +116,7 @@ public void skipFloats(int total) { try { getValuesInputStream().skipFully((long) total * FLOAT_SIZE); } catch (IOException e) { - throw new RuntimeException("Failed to skip " + total + " bytes", e); + throw new ParquetDecodingException("Failed to skip bytes", e); } } @@ -129,7 +130,7 @@ public void skipDoubles(int total) { try { getValuesInputStream().skipFully((long) total * DOUBLE_SIZE); } catch (IOException e) { - throw new RuntimeException("Failed to skip " + total + " bytes", e); + throw new ParquetDecodingException("Failed to skip bytes", e); } } From 3d2d0cb07af11eeea203c81c9917323c69a91ecb Mon Sep 17 00:00:00 2001 From: lurnagao-dahua Date: Mon, 2 Mar 2026 21:14:36 +0800 Subject: [PATCH 3/5] fix pos delete --- .../data/vectorized/ColumnarBatchUtil.java | 19 ++++++++++++------- .../iceberg/spark/source/BaseBatchReader.java | 7 +++---- ...estParquetPageSkippingVectorizedReads.java | 9 ++++----- .../data/vectorized/ColumnarBatchUtil.java | 19 ++++++++++++------- .../iceberg/spark/source/BaseBatchReader.java | 7 +++---- .../vectorized/TestColumnarBatchUtil.java | 4 ++++ ...estParquetPageSkippingVectorizedReads.java | 9 ++++----- .../data/vectorized/ColumnarBatchUtil.java | 19 ++++++++++++------- .../iceberg/spark/source/BaseBatchReader.java | 7 +++---- .../vectorized/TestColumnarBatchUtil.java | 4 ++++ ...estParquetPageSkippingVectorizedReads.java | 9 ++++----- .../data/vectorized/ColumnarBatchUtil.java | 19 ++++++++++++------- .../iceberg/spark/source/BaseBatchReader.java | 7 +++---- .../vectorized/TestColumnarBatchUtil.java | 4 ++++ ...estParquetPageSkippingVectorizedReads.java | 9 ++++----- 15 files changed, 88 insertions(+), 64 deletions(-) diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchUtil.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchUtil.java index 89fe4538bdc2..5730c5636c7f 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchUtil.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchUtil.java @@ -50,14 +50,14 @@ private ColumnarBatchUtil() {} * * @param columnVectors the array of column vectors for the batch * @param deletes the delete filter containing delete information - * @param rowStartPosInBatch the starting position of the row in the batch + * @param rowPositionColumnIndex the index of the row position column in the column vectors. * @param batchSize the size of the batch * @return the mapping array and the number of live rows, or {@code null} if nothing is deleted */ public static Pair buildRowIdMapping( ColumnVector[] columnVectors, DeleteFilter deletes, - long rowStartPosInBatch, + int rowPositionColumnIndex, int batchSize) { if (deletes == null) { return null; @@ -70,7 +70,10 @@ public static Pair buildRowIdMapping( int liveRowId = 0; for (int rowId = 0; rowId < batchSize; rowId++) { - long pos = rowStartPosInBatch + rowId; + long pos = + rowPositionColumnIndex == -1 + ? rowId - 1 + : columnVectors[rowPositionColumnIndex].getLong(rowId); row.rowId = rowId; if (isDeleted(pos, row, deletedPositions, eqDeleteFilter)) { deletes.incrementDeleteCount(); @@ -102,15 +105,14 @@ public static Pair buildRowIdMapping( * * @param columnVectors the array of column vectors for the batch. * @param deletes the delete filter containing information about which rows should be deleted. - * @param rowStartPosInBatch the starting position of the row in the batch, used to calculate the - * absolute position of the rows in the context of the entire dataset. + * @param rowPositionColumnIndex the index of the row position column in the column vectors. * @param batchSize the number of rows in the current batch. * @return an array of boolean values to indicate if a row is deleted or not */ public static boolean[] buildIsDeleted( ColumnVector[] columnVectors, DeleteFilter deletes, - long rowStartPosInBatch, + int rowPositionColumnIndex, int batchSize) { boolean[] isDeleted = new boolean[batchSize]; @@ -123,7 +125,10 @@ public static boolean[] buildIsDeleted( ColumnarBatchRow row = new ColumnarBatchRow(columnVectors); for (int rowId = 0; rowId < batchSize; rowId++) { - long pos = rowStartPosInBatch + rowId; + long pos = + rowPositionColumnIndex == -1 + ? rowId - 1 + : columnVectors[rowPositionColumnIndex].getLong(rowId); row.rowId = rowId; if (isDeleted(pos, row, deletedPositions, eqDeleteFilter)) { deletes.incrementDeleteCount(); diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java index ff30f29aeae6..45fdf57bebd6 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java @@ -181,12 +181,10 @@ ColumnarBatch filterBatch(ColumnarBatch batch) { } int numLiveRows = batch.numRows(); - long rowStartPosInBatch = - rowPositionColumnIndex == -1 ? -1 : vectors[rowPositionColumnIndex].getLong(0); if (hasIsDeletedColumn) { boolean[] isDeleted = - ColumnarBatchUtil.buildIsDeleted(vectors, deletes, rowStartPosInBatch, numLiveRows); + ColumnarBatchUtil.buildIsDeleted(vectors, deletes, rowPositionColumnIndex, numLiveRows); for (ColumnVector vector : vectors) { if (vector instanceof UpdatableDeletedColumnVector) { ((UpdatableDeletedColumnVector) vector).setValue(isDeleted); @@ -194,7 +192,8 @@ ColumnarBatch filterBatch(ColumnarBatch batch) { } } else { Pair pair = - ColumnarBatchUtil.buildRowIdMapping(vectors, deletes, rowStartPosInBatch, numLiveRows); + ColumnarBatchUtil.buildRowIdMapping( + vectors, deletes, rowPositionColumnIndex, numLiveRows); if (pair != null) { int[] rowIdMapping = pair.first(); numLiveRows = pair.second(); diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetPageSkippingVectorizedReads.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetPageSkippingVectorizedReads.java index 76b9781d37d9..9d38ec9acb11 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetPageSkippingVectorizedReads.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetPageSkippingVectorizedReads.java @@ -209,7 +209,7 @@ public void testPageSkippingWithDelete() { // pos 180, 800 are deleted in file when(deletedRowPos.isDeleted(180L)).thenReturn(true); - when(deletedRowPos.isDeleted(800L)).thenReturn(true); + when(deletedRowPos.isDeleted(380L)).thenReturn(true); when(deleteFilter.deletedRowPositions()).thenReturn(deletedRowPos); Expression filter = @@ -217,14 +217,13 @@ public void testPageSkippingWithDelete() { // page-1 in row group 0 -> row ranges: [100, 199] Expressions.and( Expressions.greaterThanOrEqual("l", 150), Expressions.lessThan("l", 200)), - // firstRowIndex in row group 1 -> 619 - // page-1, page-2 in row group 1 -> row ranges: [100, 299] + // page-3 in row group 0 -> row ranges: [300, 399] Expressions.and( - Expressions.greaterThanOrEqual("l", 750), Expressions.lessThan("l", 850))); + Expressions.greaterThanOrEqual("l", 350), Expressions.lessThan("l", 400))); List expected = selectRecords( - allRecords, Pair.of(100, 180), Pair.of(181, 200), Pair.of(719, 800), Pair.of(801, 919)); + allRecords, Pair.of(100, 180), Pair.of(181, 200), Pair.of(300, 380), Pair.of(381, 400)); readAndValidate(filter, expected, deleteFilter); } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchUtil.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchUtil.java index 89fe4538bdc2..5730c5636c7f 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchUtil.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchUtil.java @@ -50,14 +50,14 @@ private ColumnarBatchUtil() {} * * @param columnVectors the array of column vectors for the batch * @param deletes the delete filter containing delete information - * @param rowStartPosInBatch the starting position of the row in the batch + * @param rowPositionColumnIndex the index of the row position column in the column vectors. * @param batchSize the size of the batch * @return the mapping array and the number of live rows, or {@code null} if nothing is deleted */ public static Pair buildRowIdMapping( ColumnVector[] columnVectors, DeleteFilter deletes, - long rowStartPosInBatch, + int rowPositionColumnIndex, int batchSize) { if (deletes == null) { return null; @@ -70,7 +70,10 @@ public static Pair buildRowIdMapping( int liveRowId = 0; for (int rowId = 0; rowId < batchSize; rowId++) { - long pos = rowStartPosInBatch + rowId; + long pos = + rowPositionColumnIndex == -1 + ? rowId - 1 + : columnVectors[rowPositionColumnIndex].getLong(rowId); row.rowId = rowId; if (isDeleted(pos, row, deletedPositions, eqDeleteFilter)) { deletes.incrementDeleteCount(); @@ -102,15 +105,14 @@ public static Pair buildRowIdMapping( * * @param columnVectors the array of column vectors for the batch. * @param deletes the delete filter containing information about which rows should be deleted. - * @param rowStartPosInBatch the starting position of the row in the batch, used to calculate the - * absolute position of the rows in the context of the entire dataset. + * @param rowPositionColumnIndex the index of the row position column in the column vectors. * @param batchSize the number of rows in the current batch. * @return an array of boolean values to indicate if a row is deleted or not */ public static boolean[] buildIsDeleted( ColumnVector[] columnVectors, DeleteFilter deletes, - long rowStartPosInBatch, + int rowPositionColumnIndex, int batchSize) { boolean[] isDeleted = new boolean[batchSize]; @@ -123,7 +125,10 @@ public static boolean[] buildIsDeleted( ColumnarBatchRow row = new ColumnarBatchRow(columnVectors); for (int rowId = 0; rowId < batchSize; rowId++) { - long pos = rowStartPosInBatch + rowId; + long pos = + rowPositionColumnIndex == -1 + ? rowId - 1 + : columnVectors[rowPositionColumnIndex].getLong(rowId); row.rowId = rowId; if (isDeleted(pos, row, deletedPositions, eqDeleteFilter)) { deletes.incrementDeleteCount(); diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java index ff30f29aeae6..45fdf57bebd6 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java @@ -181,12 +181,10 @@ ColumnarBatch filterBatch(ColumnarBatch batch) { } int numLiveRows = batch.numRows(); - long rowStartPosInBatch = - rowPositionColumnIndex == -1 ? -1 : vectors[rowPositionColumnIndex].getLong(0); if (hasIsDeletedColumn) { boolean[] isDeleted = - ColumnarBatchUtil.buildIsDeleted(vectors, deletes, rowStartPosInBatch, numLiveRows); + ColumnarBatchUtil.buildIsDeleted(vectors, deletes, rowPositionColumnIndex, numLiveRows); for (ColumnVector vector : vectors) { if (vector instanceof UpdatableDeletedColumnVector) { ((UpdatableDeletedColumnVector) vector).setValue(isDeleted); @@ -194,7 +192,8 @@ ColumnarBatch filterBatch(ColumnarBatch batch) { } } else { Pair pair = - ColumnarBatchUtil.buildRowIdMapping(vectors, deletes, rowStartPosInBatch, numLiveRows); + ColumnarBatchUtil.buildRowIdMapping( + vectors, deletes, rowPositionColumnIndex, numLiveRows); if (pair != null) { int[] rowIdMapping = pair.first(); numLiveRows = pair.second(); diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/TestColumnarBatchUtil.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/TestColumnarBatchUtil.java index dadbe3e788b7..47bb844003ad 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/TestColumnarBatchUtil.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/TestColumnarBatchUtil.java @@ -297,6 +297,10 @@ private ColumnVector[] mockColumnVector() { when(intVector.getInt(3)).thenReturn(43); when(intVector.getInt(4)).thenReturn(44); + for (int i = 0; i < 200; i++) { + when(intVector.getLong(i)).thenReturn((long) i); + } + return new ColumnVector[] {intVector}; } } diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetPageSkippingVectorizedReads.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetPageSkippingVectorizedReads.java index 0163426214ba..d31b48feef71 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetPageSkippingVectorizedReads.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetPageSkippingVectorizedReads.java @@ -209,7 +209,7 @@ public void testPageSkippingWithDelete() { // pos 180, 800 are deleted in file when(deletedRowPos.isDeleted(180L)).thenReturn(true); - when(deletedRowPos.isDeleted(800L)).thenReturn(true); + when(deletedRowPos.isDeleted(380L)).thenReturn(true); when(deleteFilter.deletedRowPositions()).thenReturn(deletedRowPos); Expression filter = @@ -217,14 +217,13 @@ public void testPageSkippingWithDelete() { // page-1 in row group 0 -> row ranges: [100, 199] Expressions.and( Expressions.greaterThanOrEqual("l", 150), Expressions.lessThan("l", 200)), - // firstRowIndex in row group 1 -> 619 - // page-1, page-2 in row group 1 -> row ranges: [100, 299] + // page-3 in row group 0 -> row ranges: [300, 399] Expressions.and( - Expressions.greaterThanOrEqual("l", 750), Expressions.lessThan("l", 850))); + Expressions.greaterThanOrEqual("l", 350), Expressions.lessThan("l", 400))); List expected = selectRecords( - allRecords, Pair.of(100, 180), Pair.of(181, 200), Pair.of(719, 800), Pair.of(801, 919)); + allRecords, Pair.of(100, 180), Pair.of(181, 200), Pair.of(300, 380), Pair.of(381, 400)); readAndValidate(filter, expected, deleteFilter); } diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchUtil.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchUtil.java index 89fe4538bdc2..5730c5636c7f 100644 --- a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchUtil.java +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchUtil.java @@ -50,14 +50,14 @@ private ColumnarBatchUtil() {} * * @param columnVectors the array of column vectors for the batch * @param deletes the delete filter containing delete information - * @param rowStartPosInBatch the starting position of the row in the batch + * @param rowPositionColumnIndex the index of the row position column in the column vectors. * @param batchSize the size of the batch * @return the mapping array and the number of live rows, or {@code null} if nothing is deleted */ public static Pair buildRowIdMapping( ColumnVector[] columnVectors, DeleteFilter deletes, - long rowStartPosInBatch, + int rowPositionColumnIndex, int batchSize) { if (deletes == null) { return null; @@ -70,7 +70,10 @@ public static Pair buildRowIdMapping( int liveRowId = 0; for (int rowId = 0; rowId < batchSize; rowId++) { - long pos = rowStartPosInBatch + rowId; + long pos = + rowPositionColumnIndex == -1 + ? rowId - 1 + : columnVectors[rowPositionColumnIndex].getLong(rowId); row.rowId = rowId; if (isDeleted(pos, row, deletedPositions, eqDeleteFilter)) { deletes.incrementDeleteCount(); @@ -102,15 +105,14 @@ public static Pair buildRowIdMapping( * * @param columnVectors the array of column vectors for the batch. * @param deletes the delete filter containing information about which rows should be deleted. - * @param rowStartPosInBatch the starting position of the row in the batch, used to calculate the - * absolute position of the rows in the context of the entire dataset. + * @param rowPositionColumnIndex the index of the row position column in the column vectors. * @param batchSize the number of rows in the current batch. * @return an array of boolean values to indicate if a row is deleted or not */ public static boolean[] buildIsDeleted( ColumnVector[] columnVectors, DeleteFilter deletes, - long rowStartPosInBatch, + int rowPositionColumnIndex, int batchSize) { boolean[] isDeleted = new boolean[batchSize]; @@ -123,7 +125,10 @@ public static boolean[] buildIsDeleted( ColumnarBatchRow row = new ColumnarBatchRow(columnVectors); for (int rowId = 0; rowId < batchSize; rowId++) { - long pos = rowStartPosInBatch + rowId; + long pos = + rowPositionColumnIndex == -1 + ? rowId - 1 + : columnVectors[rowPositionColumnIndex].getLong(rowId); row.rowId = rowId; if (isDeleted(pos, row, deletedPositions, eqDeleteFilter)) { deletes.incrementDeleteCount(); diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java index ff30f29aeae6..45fdf57bebd6 100644 --- a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java @@ -181,12 +181,10 @@ ColumnarBatch filterBatch(ColumnarBatch batch) { } int numLiveRows = batch.numRows(); - long rowStartPosInBatch = - rowPositionColumnIndex == -1 ? -1 : vectors[rowPositionColumnIndex].getLong(0); if (hasIsDeletedColumn) { boolean[] isDeleted = - ColumnarBatchUtil.buildIsDeleted(vectors, deletes, rowStartPosInBatch, numLiveRows); + ColumnarBatchUtil.buildIsDeleted(vectors, deletes, rowPositionColumnIndex, numLiveRows); for (ColumnVector vector : vectors) { if (vector instanceof UpdatableDeletedColumnVector) { ((UpdatableDeletedColumnVector) vector).setValue(isDeleted); @@ -194,7 +192,8 @@ ColumnarBatch filterBatch(ColumnarBatch batch) { } } else { Pair pair = - ColumnarBatchUtil.buildRowIdMapping(vectors, deletes, rowStartPosInBatch, numLiveRows); + ColumnarBatchUtil.buildRowIdMapping( + vectors, deletes, rowPositionColumnIndex, numLiveRows); if (pair != null) { int[] rowIdMapping = pair.first(); numLiveRows = pair.second(); diff --git a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/TestColumnarBatchUtil.java b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/TestColumnarBatchUtil.java index dadbe3e788b7..47bb844003ad 100644 --- a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/TestColumnarBatchUtil.java +++ b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/TestColumnarBatchUtil.java @@ -297,6 +297,10 @@ private ColumnVector[] mockColumnVector() { when(intVector.getInt(3)).thenReturn(43); when(intVector.getInt(4)).thenReturn(44); + for (int i = 0; i < 200; i++) { + when(intVector.getLong(i)).thenReturn((long) i); + } + return new ColumnVector[] {intVector}; } } diff --git a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetPageSkippingVectorizedReads.java b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetPageSkippingVectorizedReads.java index 4c9f2f399ffa..7c86116e56d5 100644 --- a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetPageSkippingVectorizedReads.java +++ b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetPageSkippingVectorizedReads.java @@ -209,7 +209,7 @@ public void testPageSkippingWithDelete() { // pos 180, 800 are deleted in file when(deletedRowPos.isDeleted(180L)).thenReturn(true); - when(deletedRowPos.isDeleted(800L)).thenReturn(true); + when(deletedRowPos.isDeleted(380L)).thenReturn(true); when(deleteFilter.deletedRowPositions()).thenReturn(deletedRowPos); Expression filter = @@ -217,14 +217,13 @@ public void testPageSkippingWithDelete() { // page-1 in row group 0 -> row ranges: [100, 199] Expressions.and( Expressions.greaterThanOrEqual("l", 150), Expressions.lessThan("l", 200)), - // firstRowIndex in row group 1 -> 613 - // page-1, page-2 in row group 1 -> row ranges: [100, 299] + // page-3 in row group 0 -> row ranges: [300, 399] Expressions.and( - Expressions.greaterThanOrEqual("l", 750), Expressions.lessThan("l", 850))); + Expressions.greaterThanOrEqual("l", 350), Expressions.lessThan("l", 400))); List expected = selectRecords( - allRecords, Pair.of(100, 180), Pair.of(181, 200), Pair.of(713, 800), Pair.of(801, 913)); + allRecords, Pair.of(100, 180), Pair.of(181, 200), Pair.of(300, 380), Pair.of(381, 400)); readAndValidate(filter, expected, deleteFilter); } diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchUtil.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchUtil.java index 89fe4538bdc2..5730c5636c7f 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchUtil.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchUtil.java @@ -50,14 +50,14 @@ private ColumnarBatchUtil() {} * * @param columnVectors the array of column vectors for the batch * @param deletes the delete filter containing delete information - * @param rowStartPosInBatch the starting position of the row in the batch + * @param rowPositionColumnIndex the index of the row position column in the column vectors. * @param batchSize the size of the batch * @return the mapping array and the number of live rows, or {@code null} if nothing is deleted */ public static Pair buildRowIdMapping( ColumnVector[] columnVectors, DeleteFilter deletes, - long rowStartPosInBatch, + int rowPositionColumnIndex, int batchSize) { if (deletes == null) { return null; @@ -70,7 +70,10 @@ public static Pair buildRowIdMapping( int liveRowId = 0; for (int rowId = 0; rowId < batchSize; rowId++) { - long pos = rowStartPosInBatch + rowId; + long pos = + rowPositionColumnIndex == -1 + ? rowId - 1 + : columnVectors[rowPositionColumnIndex].getLong(rowId); row.rowId = rowId; if (isDeleted(pos, row, deletedPositions, eqDeleteFilter)) { deletes.incrementDeleteCount(); @@ -102,15 +105,14 @@ public static Pair buildRowIdMapping( * * @param columnVectors the array of column vectors for the batch. * @param deletes the delete filter containing information about which rows should be deleted. - * @param rowStartPosInBatch the starting position of the row in the batch, used to calculate the - * absolute position of the rows in the context of the entire dataset. + * @param rowPositionColumnIndex the index of the row position column in the column vectors. * @param batchSize the number of rows in the current batch. * @return an array of boolean values to indicate if a row is deleted or not */ public static boolean[] buildIsDeleted( ColumnVector[] columnVectors, DeleteFilter deletes, - long rowStartPosInBatch, + int rowPositionColumnIndex, int batchSize) { boolean[] isDeleted = new boolean[batchSize]; @@ -123,7 +125,10 @@ public static boolean[] buildIsDeleted( ColumnarBatchRow row = new ColumnarBatchRow(columnVectors); for (int rowId = 0; rowId < batchSize; rowId++) { - long pos = rowStartPosInBatch + rowId; + long pos = + rowPositionColumnIndex == -1 + ? rowId - 1 + : columnVectors[rowPositionColumnIndex].getLong(rowId); row.rowId = rowId; if (isDeleted(pos, row, deletedPositions, eqDeleteFilter)) { deletes.incrementDeleteCount(); diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java index ff30f29aeae6..45fdf57bebd6 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java @@ -181,12 +181,10 @@ ColumnarBatch filterBatch(ColumnarBatch batch) { } int numLiveRows = batch.numRows(); - long rowStartPosInBatch = - rowPositionColumnIndex == -1 ? -1 : vectors[rowPositionColumnIndex].getLong(0); if (hasIsDeletedColumn) { boolean[] isDeleted = - ColumnarBatchUtil.buildIsDeleted(vectors, deletes, rowStartPosInBatch, numLiveRows); + ColumnarBatchUtil.buildIsDeleted(vectors, deletes, rowPositionColumnIndex, numLiveRows); for (ColumnVector vector : vectors) { if (vector instanceof UpdatableDeletedColumnVector) { ((UpdatableDeletedColumnVector) vector).setValue(isDeleted); @@ -194,7 +192,8 @@ ColumnarBatch filterBatch(ColumnarBatch batch) { } } else { Pair pair = - ColumnarBatchUtil.buildRowIdMapping(vectors, deletes, rowStartPosInBatch, numLiveRows); + ColumnarBatchUtil.buildRowIdMapping( + vectors, deletes, rowPositionColumnIndex, numLiveRows); if (pair != null) { int[] rowIdMapping = pair.first(); numLiveRows = pair.second(); diff --git a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/TestColumnarBatchUtil.java b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/TestColumnarBatchUtil.java index dadbe3e788b7..47bb844003ad 100644 --- a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/TestColumnarBatchUtil.java +++ b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/TestColumnarBatchUtil.java @@ -297,6 +297,10 @@ private ColumnVector[] mockColumnVector() { when(intVector.getInt(3)).thenReturn(43); when(intVector.getInt(4)).thenReturn(44); + for (int i = 0; i < 200; i++) { + when(intVector.getLong(i)).thenReturn((long) i); + } + return new ColumnVector[] {intVector}; } } diff --git a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetPageSkippingVectorizedReads.java b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetPageSkippingVectorizedReads.java index 4c9f2f399ffa..7c86116e56d5 100644 --- a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetPageSkippingVectorizedReads.java +++ b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetPageSkippingVectorizedReads.java @@ -209,7 +209,7 @@ public void testPageSkippingWithDelete() { // pos 180, 800 are deleted in file when(deletedRowPos.isDeleted(180L)).thenReturn(true); - when(deletedRowPos.isDeleted(800L)).thenReturn(true); + when(deletedRowPos.isDeleted(380L)).thenReturn(true); when(deleteFilter.deletedRowPositions()).thenReturn(deletedRowPos); Expression filter = @@ -217,14 +217,13 @@ public void testPageSkippingWithDelete() { // page-1 in row group 0 -> row ranges: [100, 199] Expressions.and( Expressions.greaterThanOrEqual("l", 150), Expressions.lessThan("l", 200)), - // firstRowIndex in row group 1 -> 613 - // page-1, page-2 in row group 1 -> row ranges: [100, 299] + // page-3 in row group 0 -> row ranges: [300, 399] Expressions.and( - Expressions.greaterThanOrEqual("l", 750), Expressions.lessThan("l", 850))); + Expressions.greaterThanOrEqual("l", 350), Expressions.lessThan("l", 400))); List expected = selectRecords( - allRecords, Pair.of(100, 180), Pair.of(181, 200), Pair.of(713, 800), Pair.of(801, 913)); + allRecords, Pair.of(100, 180), Pair.of(181, 200), Pair.of(300, 380), Pair.of(381, 400)); readAndValidate(filter, expected, deleteFilter); } From 58ec748894a569df2b7ffa06d29b943a2572ef7c Mon Sep 17 00:00:00 2001 From: liu yang Date: Tue, 3 Mar 2026 16:54:15 +0800 Subject: [PATCH 4/5] merge from main --- .../VectorizedDeltaEncodedValuesReader.java | 16 +++++----------- 1 file changed, 5 insertions(+), 11 deletions(-) diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedDeltaEncodedValuesReader.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedDeltaEncodedValuesReader.java index b886d2908226..c0b8dc9b0715 100644 --- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedDeltaEncodedValuesReader.java +++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedDeltaEncodedValuesReader.java @@ -122,17 +122,6 @@ public void skipIntegers(int total) { readValues(total, null, -1, INT_SIZE, (f, i, v) -> {}); } - @Override - public void readLongs(int total, FieldVector vec, int rowId) { - readValues(total, vec, rowId, LONG_SIZE, (f, i, v) -> f.getDataBuffer().setLong(i, v)); - } - - @Override - public void skipLongs(int total) { - readValues(total, null, -1, LONG_SIZE, (f, i, v) -> {}); - } - - /** DELTA_BINARY_PACKED only supports INT32 and INT64 */ int[] readIntegers(int total, int rowId) { int[] result = new int[total]; readValues( @@ -149,6 +138,11 @@ public void readLongs(int total, FieldVector vec, int rowId) { readValues(total, vec, rowId, LONG_SIZE, (f, i, v) -> f.getDataBuffer().setLong(i, v)); } + @Override + public void skipLongs(int total) { + readValues(total, null, -1, LONG_SIZE, (f, i, v) -> {}); + } + private void readValues( int total, FieldVector vec, int rowId, int typeWidth, IntegerOutputWriter outputWriter) { if (valuesRead + total > totalValueCount) { From 28e01f80915d18eeafe98f7f41e3b20ac31f936a Mon Sep 17 00:00:00 2001 From: liu yang Date: Wed, 4 Mar 2026 10:41:09 +0800 Subject: [PATCH 5/5] Remove unused getActualBatchSize method Removed the getActualBatchSize method as it is no longer needed. --- .../arrow/vectorized/parquet/VectorizedPageIterator.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedPageIterator.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedPageIterator.java index 412f6e4c84a6..b763186cf06c 100644 --- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedPageIterator.java +++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedPageIterator.java @@ -337,10 +337,6 @@ private static int byteStreamSplitElementSize(PrimitiveType type) { } } - private int getActualBatchSize(int expectedBatchSize) { - return Math.min(expectedBatchSize, triplesCount - triplesRead); - } - class FixedSizeBinaryPageReader extends BasePageReader { @Override protected void nextVal(