diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorHolder.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorHolder.java index 14062ff6558f..a361b7a887ef 100644 --- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorHolder.java +++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorHolder.java @@ -63,6 +63,15 @@ private VectorHolder() { icebergType = null; } + private VectorHolder(FieldVector vec, Type type, NullabilityHolder nulls) { + columnDescriptor = null; + vector = vec; + isDictionaryEncoded = false; + dictionary = null; + nullabilityHolder = nulls; + icebergType = type; + } + public ColumnDescriptor descriptor() { return columnDescriptor; } @@ -131,4 +140,10 @@ public Object getConstant() { } } + public static class PositionVectorHolder extends VectorHolder { + public PositionVectorHolder(FieldVector vector, Type type, NullabilityHolder nulls) { + super(vector, type, nulls); + } + } + } diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java index 4eb8091c2815..d726707674c1 100644 --- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java +++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java @@ -33,6 +33,8 @@ import org.apache.arrow.vector.types.pojo.ArrowType; import org.apache.arrow.vector.types.pojo.Field; import org.apache.arrow.vector.types.pojo.FieldType; +import org.apache.iceberg.MetadataColumns; +import org.apache.iceberg.arrow.ArrowAllocation; import org.apache.iceberg.arrow.ArrowSchemaUtil; import org.apache.iceberg.arrow.vectorized.parquet.VectorizedColumnIterator; import org.apache.iceberg.parquet.ParquetUtil; @@ -313,7 +315,7 @@ private void allocateFieldVector(boolean dictionaryEncodedVector) { } @Override - public void setRowGroupInfo(PageReadStore source, Map metadata) { + public void setRowGroupInfo(PageReadStore source, Map metadata, long rowPosition) { ColumnChunkMetaData chunkMetaData = metadata.get(ColumnPath.get(columnDescriptor.getPath())); this.dictionary = vectorizedColumnIterator.setRowGroupInfo( source.getPageReader(columnDescriptor), @@ -336,6 +338,10 @@ public static VectorizedArrowReader nulls() { return NullVectorReader.INSTANCE; } + public static VectorizedArrowReader positions() { + return new PositionVectorReader(); + } + private static final class NullVectorReader extends VectorizedArrowReader { private static final NullVectorReader INSTANCE = new NullVectorReader(); @@ -345,7 +351,7 @@ public VectorHolder read(VectorHolder reuse, int numValsToRead) { } @Override - public void setRowGroupInfo(PageReadStore source, Map metadata) { + public void setRowGroupInfo(PageReadStore source, Map metadata, long rowPosition) { } @Override @@ -358,6 +364,47 @@ public void setBatchSize(int batchSize) { } } + private static final class PositionVectorReader extends VectorizedArrowReader { + private long rowStart; + private NullabilityHolder nulls; + + @Override + public VectorHolder read(VectorHolder reuse, int numValsToRead) { + Field arrowField = ArrowSchemaUtil.convert(MetadataColumns.ROW_POSITION); + FieldVector vec = arrowField.createVector(ArrowAllocation.rootAllocator()); + + if (reuse != null) { + vec.setValueCount(0); + nulls.reset(); + } else { + ((BigIntVector) vec).allocateNew(numValsToRead); + for (int i = 0; i < numValsToRead; i += 1) { + vec.getDataBuffer().setLong(i * Long.BYTES, rowStart + i); + } + nulls = new NullabilityHolder(numValsToRead); + } + + vec.setValueCount(numValsToRead); + nulls.setNotNulls(0, numValsToRead); + + return new VectorHolder.PositionVectorHolder(vec, MetadataColumns.ROW_POSITION.type(), nulls); + } + + @Override + public void setRowGroupInfo(PageReadStore source, Map metadata, long rowPosition) { + this.rowStart = rowPosition; + } + + @Override + public String toString() { + return getClass().toString(); + } + + @Override + public void setBatchSize(int batchSize) { + } + } + /** * A Dummy Vector Reader which doesn't actually read files, instead it returns a dummy * VectorHolder which indicates the constant value which should be used for this column. @@ -376,7 +423,7 @@ public VectorHolder read(VectorHolder reuse, int numValsToRead) { } @Override - public void setRowGroupInfo(PageReadStore source, Map metadata) { + public void setRowGroupInfo(PageReadStore source, Map metadata, long rowPosition) { } @Override diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/VectorizedParquetReader.java b/parquet/src/main/java/org/apache/iceberg/parquet/VectorizedParquetReader.java index 481012cb8bbe..d3e08480c500 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/VectorizedParquetReader.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/VectorizedParquetReader.java @@ -99,6 +99,7 @@ private static class FileIterator implements CloseableIterator { private long nextRowGroupStart = 0; private long valuesRead = 0; private T last = null; + private final long[] rowGroupsStartRowPos; FileIterator(ReadConf conf) { this.reader = conf.reader(); @@ -109,6 +110,7 @@ private static class FileIterator implements CloseableIterator { this.batchSize = conf.batchSize(); this.model.setBatchSize(this.batchSize); this.columnChunkMetadata = conf.columnChunkMetadataForRowGroups(); + this.rowGroupsStartRowPos = conf.startRowPositions(); } @@ -149,7 +151,9 @@ private void advance() { } catch (IOException e) { throw new RuntimeIOException(e); } - model.setRowGroupInfo(pages, columnChunkMetadata.get(nextRowGroup)); + + long rowPosition = rowGroupsStartRowPos[nextRowGroup]; + model.setRowGroupInfo(pages, columnChunkMetadata.get(nextRowGroup), rowPosition); nextRowGroupStart += pages.getRowCount(); nextRowGroup += 1; } diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/VectorizedReader.java b/parquet/src/main/java/org/apache/iceberg/parquet/VectorizedReader.java index 1872b4a19b4c..aa8582dda755 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/VectorizedReader.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/VectorizedReader.java @@ -45,8 +45,9 @@ public interface VectorizedReader { * * @param pages row group information for all the columns * @param metadata map of {@link ColumnPath} -> {@link ColumnChunkMetaData} for the row group + * @param rowPosition the row group's row offset in the parquet file */ - void setRowGroupInfo(PageReadStore pages, Map metadata); + void setRowGroupInfo(PageReadStore pages, Map metadata, long rowPosition); /** * Release any resources allocated. diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ArrowVectorAccessors.java b/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ArrowVectorAccessors.java index 0e4d9ab0f2b0..058dfc4bdfe8 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ArrowVectorAccessors.java +++ b/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ArrowVectorAccessors.java @@ -56,10 +56,10 @@ private ArrowVectorAccessors() { static ArrowVectorAccessor getVectorAccessor(VectorHolder holder) { Dictionary dictionary = holder.dictionary(); boolean isVectorDictEncoded = holder.isDictionaryEncoded(); - ColumnDescriptor desc = holder.descriptor(); FieldVector vector = holder.vector(); - PrimitiveType primitive = desc.getPrimitiveType(); if (isVectorDictEncoded) { + ColumnDescriptor desc = holder.descriptor(); + PrimitiveType primitive = desc.getPrimitiveType(); return getDictionaryVectorAccessor(dictionary, desc, vector, primitive); } else { return getPlainVectorAccessor(vector); diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java b/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java index c76321ecd61d..90951603851d 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java +++ b/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java @@ -48,10 +48,11 @@ public ColumnarBatchReader(List> readers) { } @Override - public final void setRowGroupInfo(PageReadStore pageStore, Map metaData) { + public final void setRowGroupInfo(PageReadStore pageStore, Map metaData, + long rowPosition) { for (VectorizedArrowReader reader : readers) { if (reader != null) { - reader.setRowGroupInfo(pageStore, metaData); + reader.setRowGroupInfo(pageStore, metaData, rowPosition); } } } diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java b/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java index 2834135aa3e2..dfa8da2cc033 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java +++ b/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java @@ -23,6 +23,7 @@ import java.util.Map; import java.util.stream.IntStream; import org.apache.arrow.memory.BufferAllocator; +import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.Schema; import org.apache.iceberg.arrow.ArrowAllocation; import org.apache.iceberg.arrow.vectorized.VectorizedArrowReader; @@ -103,6 +104,8 @@ public VectorizedReader message( VectorizedReader reader = readersById.get(id); if (idToConstant.containsKey(id)) { reorderedFields.add(new ConstantVectorReader(idToConstant.get(id))); + } else if (id == MetadataColumns.ROW_POSITION.fieldId()) { + reorderedFields.add(VectorizedArrowReader.positions()); } else if (reader != null) { reorderedFields.add(reader); } else { diff --git a/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReadMetadataColumns.java b/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReadMetadataColumns.java index 5f9494b3df94..f7de5ab4797d 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReadMetadataColumns.java +++ b/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReadMetadataColumns.java @@ -36,6 +36,7 @@ import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.parquet.Parquet; import org.apache.iceberg.parquet.ParquetSchemaUtil; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.iceberg.spark.data.vectorized.VectorizedSparkParquetReaders; @@ -48,6 +49,7 @@ import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.vectorized.ColumnarBatch; import org.apache.spark.unsafe.types.UTF8String; import org.junit.Assert; import org.junit.Before; @@ -106,17 +108,17 @@ public class TestSparkParquetReadMetadataColumns { } @Parameterized.Parameters(name = "vectorized = {0}") - // Vectorized parquet reads not currently supported for reads on tables - // with row position stored in the metadata column. - // https://github.com/apache/iceberg/issues/1540 - public static Object[] parameters() { - return new Object[] { false }; + public static Object[][] parameters() { + return new Object[][] { + new Object[] { false }, + new Object[] { true } + }; } @Rule public TemporaryFolder temp = new TemporaryFolder(); - private boolean vectorized; + private final boolean vectorized; private File testFile; public TestSparkParquetReadMetadataColumns(boolean vectorized) { @@ -206,7 +208,7 @@ private void readAndValidate(Expression filter, Long splitStart, Long splitLengt builder = builder.split(splitStart, splitLength); } - try (CloseableIterable reader = builder.build()) { + try (CloseableIterable reader = vectorized ? batchesToRows(builder.build()) : builder.build()) { final Iterator actualRows = reader.iterator(); for (InternalRow internalRow : expected) { @@ -217,4 +219,10 @@ private void readAndValidate(Expression filter, Long splitStart, Long splitLengt Assert.assertFalse("Should not have extra rows", actualRows.hasNext()); } } + + private CloseableIterable batchesToRows(CloseableIterable batches) { + return CloseableIterable.combine( + Iterables.concat(Iterables.transform(batches, b -> (Iterable) b::rowIterator)), + batches); + } }