Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,15 @@ private VectorHolder() {
icebergType = null;
}

private VectorHolder(FieldVector vec, Type type, NullabilityHolder nulls) {
columnDescriptor = null;
vector = vec;
isDictionaryEncoded = false;
dictionary = null;
nullabilityHolder = nulls;
icebergType = type;
}

public ColumnDescriptor descriptor() {
return columnDescriptor;
}
Expand Down Expand Up @@ -131,4 +140,10 @@ public Object getConstant() {
}
}

public static class PositionVectorHolder extends VectorHolder {
Copy link
Copy Markdown
Contributor

@shardulm94 shardulm94 Sep 15, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like technically this class is redundant since the user can use VectorHolder directly, but is probably good for readability?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I added a private VectorHolder constructor for PositionVectionHolder which I don't want others to use it directly.

public PositionVectorHolder(FieldVector vector, Type type, NullabilityHolder nulls) {
super(vector, type, nulls);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.FieldType;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.arrow.ArrowAllocation;
import org.apache.iceberg.arrow.ArrowSchemaUtil;
import org.apache.iceberg.arrow.vectorized.parquet.VectorizedColumnIterator;
import org.apache.iceberg.parquet.ParquetUtil;
Expand Down Expand Up @@ -313,7 +315,7 @@ private void allocateFieldVector(boolean dictionaryEncodedVector) {
}

@Override
public void setRowGroupInfo(PageReadStore source, Map<ColumnPath, ColumnChunkMetaData> metadata) {
public void setRowGroupInfo(PageReadStore source, Map<ColumnPath, ColumnChunkMetaData> metadata, long rowPosition) {
ColumnChunkMetaData chunkMetaData = metadata.get(ColumnPath.get(columnDescriptor.getPath()));
this.dictionary = vectorizedColumnIterator.setRowGroupInfo(
source.getPageReader(columnDescriptor),
Expand All @@ -336,6 +338,10 @@ public static VectorizedArrowReader nulls() {
return NullVectorReader.INSTANCE;
}

public static VectorizedArrowReader positions() {
return new PositionVectorReader();
}

private static final class NullVectorReader extends VectorizedArrowReader {
private static final NullVectorReader INSTANCE = new NullVectorReader();

Expand All @@ -345,7 +351,7 @@ public VectorHolder read(VectorHolder reuse, int numValsToRead) {
}

@Override
public void setRowGroupInfo(PageReadStore source, Map<ColumnPath, ColumnChunkMetaData> metadata) {
public void setRowGroupInfo(PageReadStore source, Map<ColumnPath, ColumnChunkMetaData> metadata, long rowPosition) {
}

@Override
Expand All @@ -358,6 +364,47 @@ public void setBatchSize(int batchSize) {
}
}

private static final class PositionVectorReader extends VectorizedArrowReader {
private long rowStart;
private NullabilityHolder nulls;

@Override
public VectorHolder read(VectorHolder reuse, int numValsToRead) {
Field arrowField = ArrowSchemaUtil.convert(MetadataColumns.ROW_POSITION);
FieldVector vec = arrowField.createVector(ArrowAllocation.rootAllocator());

if (reuse != null) {
vec.setValueCount(0);
nulls.reset();
} else {
((BigIntVector) vec).allocateNew(numValsToRead);
for (int i = 0; i < numValsToRead; i += 1) {
vec.getDataBuffer().setLong(i * Long.BYTES, rowStart + i);
}
nulls = new NullabilityHolder(numValsToRead);
}

vec.setValueCount(numValsToRead);
nulls.setNotNulls(0, numValsToRead);

return new VectorHolder.PositionVectorHolder(vec, MetadataColumns.ROW_POSITION.type(), nulls);
}

@Override
public void setRowGroupInfo(PageReadStore source, Map<ColumnPath, ColumnChunkMetaData> metadata, long rowPosition) {
this.rowStart = rowPosition;
}

@Override
public String toString() {
return getClass().toString();
}

@Override
public void setBatchSize(int batchSize) {
}
}

/**
* A Dummy Vector Reader which doesn't actually read files, instead it returns a dummy
* VectorHolder which indicates the constant value which should be used for this column.
Expand All @@ -376,7 +423,7 @@ public VectorHolder read(VectorHolder reuse, int numValsToRead) {
}

@Override
public void setRowGroupInfo(PageReadStore source, Map<ColumnPath, ColumnChunkMetaData> metadata) {
public void setRowGroupInfo(PageReadStore source, Map<ColumnPath, ColumnChunkMetaData> metadata, long rowPosition) {
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ private static class FileIterator<T> implements CloseableIterator<T> {
private long nextRowGroupStart = 0;
private long valuesRead = 0;
private T last = null;
private final long[] rowGroupsStartRowPos;

FileIterator(ReadConf conf) {
this.reader = conf.reader();
Expand All @@ -109,6 +110,7 @@ private static class FileIterator<T> implements CloseableIterator<T> {
this.batchSize = conf.batchSize();
this.model.setBatchSize(this.batchSize);
this.columnChunkMetadata = conf.columnChunkMetadataForRowGroups();
this.rowGroupsStartRowPos = conf.startRowPositions();
}


Expand Down Expand Up @@ -149,7 +151,9 @@ private void advance() {
} catch (IOException e) {
throw new RuntimeIOException(e);
}
model.setRowGroupInfo(pages, columnChunkMetadata.get(nextRowGroup));

long rowPosition = rowGroupsStartRowPos[nextRowGroup];
model.setRowGroupInfo(pages, columnChunkMetadata.get(nextRowGroup), rowPosition);
nextRowGroupStart += pages.getRowCount();
nextRowGroup += 1;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,9 @@ public interface VectorizedReader<T> {
*
* @param pages row group information for all the columns
* @param metadata map of {@link ColumnPath} -&gt; {@link ColumnChunkMetaData} for the row group
* @param rowPosition the row group's row offset in the parquet file
*/
void setRowGroupInfo(PageReadStore pages, Map<ColumnPath, ColumnChunkMetaData> metadata);
void setRowGroupInfo(PageReadStore pages, Map<ColumnPath, ColumnChunkMetaData> metadata, long rowPosition);

/**
* Release any resources allocated.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,10 @@ private ArrowVectorAccessors() {
static ArrowVectorAccessor getVectorAccessor(VectorHolder holder) {
Dictionary dictionary = holder.dictionary();
boolean isVectorDictEncoded = holder.isDictionaryEncoded();
ColumnDescriptor desc = holder.descriptor();
FieldVector vector = holder.vector();
PrimitiveType primitive = desc.getPrimitiveType();
if (isVectorDictEncoded) {
ColumnDescriptor desc = holder.descriptor();
PrimitiveType primitive = desc.getPrimitiveType();
return getDictionaryVectorAccessor(dictionary, desc, vector, primitive);
} else {
return getPlainVectorAccessor(vector);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,11 @@ public ColumnarBatchReader(List<VectorizedReader<?>> readers) {
}

@Override
public final void setRowGroupInfo(PageReadStore pageStore, Map<ColumnPath, ColumnChunkMetaData> metaData) {
public final void setRowGroupInfo(PageReadStore pageStore, Map<ColumnPath, ColumnChunkMetaData> metaData,
long rowPosition) {
for (VectorizedArrowReader reader : readers) {
if (reader != null) {
reader.setRowGroupInfo(pageStore, metaData);
reader.setRowGroupInfo(pageStore, metaData, rowPosition);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Map;
import java.util.stream.IntStream;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.Schema;
import org.apache.iceberg.arrow.ArrowAllocation;
import org.apache.iceberg.arrow.vectorized.VectorizedArrowReader;
Expand Down Expand Up @@ -103,6 +104,8 @@ public VectorizedReader<?> message(
VectorizedReader<?> reader = readersById.get(id);
if (idToConstant.containsKey(id)) {
reorderedFields.add(new ConstantVectorReader(idToConstant.get(id)));
} else if (id == MetadataColumns.ROW_POSITION.fieldId()) {
reorderedFields.add(VectorizedArrowReader.positions());
} else if (reader != null) {
reorderedFields.add(reader);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.parquet.ParquetSchemaUtil;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.spark.data.vectorized.VectorizedSparkParquetReaders;
Expand All @@ -48,6 +49,7 @@
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.vectorized.ColumnarBatch;
import org.apache.spark.unsafe.types.UTF8String;
import org.junit.Assert;
import org.junit.Before;
Expand Down Expand Up @@ -106,17 +108,17 @@ public class TestSparkParquetReadMetadataColumns {
}

@Parameterized.Parameters(name = "vectorized = {0}")
// Vectorized parquet reads not currently supported for reads on tables
// with row position stored in the metadata column.
// https://github.com/apache/iceberg/issues/1540
public static Object[] parameters() {
return new Object[] { false };
public static Object[][] parameters() {
return new Object[][] {
new Object[] { false },
new Object[] { true }
};
}

@Rule
public TemporaryFolder temp = new TemporaryFolder();

private boolean vectorized;
private final boolean vectorized;
private File testFile;

public TestSparkParquetReadMetadataColumns(boolean vectorized) {
Expand Down Expand Up @@ -206,7 +208,7 @@ private void readAndValidate(Expression filter, Long splitStart, Long splitLengt
builder = builder.split(splitStart, splitLength);
}

try (CloseableIterable<InternalRow> reader = builder.build()) {
try (CloseableIterable<InternalRow> reader = vectorized ? batchesToRows(builder.build()) : builder.build()) {
final Iterator<InternalRow> actualRows = reader.iterator();

for (InternalRow internalRow : expected) {
Expand All @@ -217,4 +219,10 @@ private void readAndValidate(Expression filter, Long splitStart, Long splitLengt
Assert.assertFalse("Should not have extra rows", actualRows.hasNext());
}
}

private CloseableIterable<InternalRow> batchesToRows(CloseableIterable<ColumnarBatch> batches) {
return CloseableIterable.combine(
Iterables.concat(Iterables.transform(batches, b -> (Iterable<InternalRow>) b::rowIterator)),
batches);
}
}