diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java index 9459a4ff64f6..b89915bbac14 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java @@ -61,7 +61,7 @@ import org.apache.orc.impl.TreeReaderFactory.TreeReader; import org.apache.orc.impl.WriterImpl; import org.apache.orc.OrcProto; - +import org.apache.orc.impl.reader.tree.TypeReader; public class OrcEncodedDataConsumer extends EncodedDataConsumer { @@ -193,7 +193,7 @@ protected void decodeBatch(OrcEncodedColumnBatch batch, */ TreeReader reader = columnReaders[idx]; ColumnVector cv = prepareColumnVector(cvb, idx, batchSize); - reader.nextVector(cv, null, batchSize); + reader.nextVector(cv, null, batchSize, cvb.filterContext, TypeReader.ReadPhase.ALL); } // we are done reading a batch, send it to consumer for processing @@ -234,7 +234,7 @@ private void createColumnReaders(OrcEncodedColumnBatch batch, this.batchSchemas = includes.getBatchReaderTypes(fileSchema); StructTreeReader treeReader = EncodedTreeReaderFactory.createRootTreeReader( batchSchemas, stripeMetadata.getEncodings(), batch, codec, context, useDecimal64ColumnVectors); - this.columnReaders = treeReader.getChildReaders(); + this.columnReaders = (TreeReader[]) treeReader.getChildReaders(); if (LlapIoImpl.LOG.isDebugEnabled()) { for (int i = 0; i < columnReaders.length; ++i) { @@ -306,7 +306,7 @@ private void positionInStreams(TreeReaderFactory.TreeReader[] columnReaders, for (int i = 0; i < columnReaders.length; i++) { if (columnReaders[i] == null) continue; // TODO: we could/should trace seek destinations; pps needs a "peek" method - columnReaders[i].seek(pps); + columnReaders[i].seek(pps, TypeReader.ReadPhase.ALL); } } @@ -331,7 +331,7 @@ private void repositionInStreams(TreeReaderFactory.TreeReader[] columnReaders, ((EncodedTreeReaderFactory.TimestampStreamReader) reader) .updateTimezone(stripeMetadata.getWriterTimezone()); } - reader.seek(pps); + reader.seek(pps, TypeReader.ReadPhase.ALL); } } diff --git a/pom.xml b/pom.xml index 3f28653c6ee1..077477ea21c4 100644 --- a/pom.xml +++ b/pom.xml @@ -185,7 +185,7 @@ 42.2.14 21.3.0.0 2.3 - 1.6.9 + 1.7.2 3.4.4 2.0.2 2.0.0-M5 diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedTreeReaderFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedTreeReaderFactory.java index 9ad75ddc92f6..fe71f037a988 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedTreeReaderFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedTreeReaderFactory.java @@ -35,6 +35,8 @@ import org.apache.orc.impl.PositionProvider; import org.apache.orc.impl.TreeReaderFactory; import org.apache.orc.OrcProto; +import org.apache.orc.impl.reader.tree.TypeReader; +import org.apache.hadoop.hive.ql.io.filter.FilterContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -73,10 +75,13 @@ private TimestampStreamReader(int columnId, SettableUncompressedStream present, } @Override - public void nextVector( - ColumnVector previousVector, boolean[] isNull, int batchSize) throws IOException { + public void nextVector(ColumnVector previousVector, + boolean[] isNull, + final int batchSize, + FilterContext filterContext, + ReadPhase readPhase) throws IOException { if (vectors == null) { - super.nextVector(previousVector, isNull, batchSize); + super.nextVector(previousVector, isNull, batchSize, filterContext, readPhase); return; } // Note: we assume that batchSize will be consistent with vectors passed in. @@ -88,7 +93,7 @@ public void nextVector( } @Override - public void seek(PositionProvider index) throws IOException { + public void seek(PositionProvider index, ReadPhase phase) throws IOException { if (vectors != null) return; if (present != null) { if (isFileCompressed) { @@ -251,19 +256,19 @@ private StringStreamReader(int columnId, SettableUncompressedStream present, } @Override - public void seek(PositionProvider[] index) throws IOException { + public void seek(PositionProvider[] index, ReadPhase phase) throws IOException { // This string reader should simply redirect to its own seek (what other types already do). - this.seek(index[columnId]); + this.seek(index[columnId], phase); } @Override - public void seek(PositionProvider index) throws IOException { + public void seek(PositionProvider index, ReadPhase phase) throws IOException { if (vectors != null) return; if (present != null) { if (_isFileCompressed) { index.getNext(); } - reader.getPresent().seek(index); + reader.seek(index, phase); } if (_isDictionaryEncoding) { @@ -299,10 +304,13 @@ public void seek(PositionProvider index) throws IOException { } @Override - public void nextVector( - ColumnVector previousVector, boolean[] isNull, int batchSize) throws IOException { + public void nextVector(ColumnVector previousVector, + boolean[] isNull, + final int batchSize, + FilterContext filterContext, + ReadPhase readPhase) throws IOException { if (vectors == null) { - super.nextVector(previousVector, isNull, batchSize); + super.nextVector(previousVector, isNull, batchSize, filterContext, readPhase); return; } vectors.get(vectorIndex++).shallowCopyTo(previousVector); @@ -442,7 +450,7 @@ private ShortStreamReader(int columnId, SettableUncompressedStream present, } @Override - public void seek(PositionProvider index) throws IOException { + public void seek(PositionProvider index, ReadPhase phase) throws IOException { if (vectors != null) return; if (present != null) { if (isFileCompressed) { @@ -462,10 +470,13 @@ public void seek(PositionProvider index) throws IOException { } @Override - public void nextVector( - ColumnVector previousVector, boolean[] isNull, int batchSize) throws IOException { + public void nextVector(ColumnVector previousVector, + boolean[] isNull, + final int batchSize, + FilterContext filterContext, + ReadPhase readPhase) throws IOException { if (vectors == null) { - super.nextVector(previousVector, isNull, batchSize); + super.nextVector(previousVector, isNull, batchSize, filterContext, readPhase); return; } vectors.get(vectorIndex++).shallowCopyTo(previousVector); @@ -571,7 +582,7 @@ private LongStreamReader(int columnId, SettableUncompressedStream present, } @Override - public void seek(PositionProvider index) throws IOException { + public void seek(PositionProvider index, ReadPhase readPhase) throws IOException { if (vectors != null) return; if (present != null) { if (_isFileCompressed) { @@ -591,10 +602,13 @@ public void seek(PositionProvider index) throws IOException { } @Override - public void nextVector( - ColumnVector previousVector, boolean[] isNull, int batchSize) throws IOException { + public void nextVector(ColumnVector previousVector, + boolean[] isNull, + final int batchSize, + FilterContext filterContext, + ReadPhase readPhase) throws IOException { if (vectors == null) { - super.nextVector(previousVector, isNull, batchSize); + super.nextVector(previousVector, isNull, batchSize, filterContext, readPhase); return; } vectors.get(vectorIndex++).shallowCopyTo(previousVector); @@ -701,7 +715,7 @@ private IntStreamReader(int columnId, SettableUncompressedStream present, } @Override - public void seek(PositionProvider index) throws IOException { + public void seek(PositionProvider index, ReadPhase readPhase) throws IOException { if (vectors != null) return; if (present != null) { if (_isFileCompressed) { @@ -721,10 +735,13 @@ public void seek(PositionProvider index) throws IOException { } @Override - public void nextVector( - ColumnVector previousVector, boolean[] isNull, int batchSize) throws IOException { + public void nextVector(ColumnVector previousVector, + boolean[] isNull, + final int batchSize, + FilterContext filterContext, + ReadPhase readPhase) throws IOException { if (vectors == null) { - super.nextVector(previousVector, isNull, batchSize); + super.nextVector(previousVector, isNull, batchSize, filterContext, readPhase); return; } vectors.get(vectorIndex++).shallowCopyTo(previousVector); @@ -817,12 +834,12 @@ protected static class FloatStreamReader extends FloatTreeReader implements Sett private SettableUncompressedStream _presentStream; private SettableUncompressedStream _dataStream; private List vectors; - private int vectorIndex = 0; + private int vectorIndex = 0; private FloatStreamReader(int columnId, SettableUncompressedStream present, SettableUncompressedStream data, boolean isFileCompressed, - List vectors) throws IOException { - super(columnId, present, data); + List vectors, TreeReaderFactory.Context context) throws IOException { + super(columnId, present, data, context); this._isFileCompressed = isFileCompressed; this._presentStream = present; this._dataStream = data; @@ -830,7 +847,7 @@ private FloatStreamReader(int columnId, SettableUncompressedStream present, } @Override - public void seek(PositionProvider index) throws IOException { + public void seek(PositionProvider index, ReadPhase readPhase) throws IOException { if (vectors != null) return; if (present != null) { if (_isFileCompressed) { @@ -848,10 +865,13 @@ public void seek(PositionProvider index) throws IOException { } @Override - public void nextVector( - ColumnVector previousVector, boolean[] isNull, int batchSize) throws IOException { + public void nextVector(ColumnVector previousVector, + boolean[] isNull, + final int batchSize, + FilterContext filterContext, + ReadPhase readPhase) throws IOException { if (vectors == null) { - super.nextVector(previousVector, isNull, batchSize); + super.nextVector(previousVector, isNull, batchSize, filterContext, readPhase); return; } vectors.get(vectorIndex++).shallowCopyTo(previousVector); @@ -879,6 +899,7 @@ public static class StreamReaderBuilder { private ColumnStreamData dataStream; private CompressionCodec compressionCodec; private List vectors; + private TreeReaderFactory.Context context; public StreamReaderBuilder setColumnIndex(int columnIndex) { this.columnIndex = columnIndex; @@ -910,7 +931,12 @@ public FloatStreamReader build() throws IOException { dataStream); boolean isFileCompressed = compressionCodec != null; - return new FloatStreamReader(columnIndex, present, data, isFileCompressed, vectors); + return new FloatStreamReader(columnIndex, present, data, isFileCompressed, vectors, context); + } + + public StreamReaderBuilder setContext(Context context) { + this.context = context; + return this; } public StreamReaderBuilder setVectors(List vectors) { @@ -934,8 +960,8 @@ protected static class DoubleStreamReader extends DoubleTreeReader implements Se private DoubleStreamReader(int columnId, SettableUncompressedStream present, SettableUncompressedStream data, boolean isFileCompressed, - List vectors) throws IOException { - super(columnId, present, data); + List vectors, TreeReaderFactory.Context context) throws IOException { + super(columnId, present, data, context); this._isFileCompressed = isFileCompressed; this._presentStream = present; this._dataStream = data; @@ -943,7 +969,7 @@ private DoubleStreamReader(int columnId, SettableUncompressedStream present, } @Override - public void seek(PositionProvider index) throws IOException { + public void seek(PositionProvider index, ReadPhase readPhase) throws IOException { if (vectors != null) return; if (present != null) { if (_isFileCompressed) { @@ -961,10 +987,13 @@ public void seek(PositionProvider index) throws IOException { } @Override - public void nextVector( - ColumnVector previousVector, boolean[] isNull, int batchSize) throws IOException { + public void nextVector(ColumnVector previousVector, + boolean[] isNull, + final int batchSize, + FilterContext filterContext, + ReadPhase readPhase) throws IOException { if (vectors == null) { - super.nextVector(previousVector, isNull, batchSize); + super.nextVector(previousVector, isNull, batchSize, filterContext, readPhase); return; } vectors.get(vectorIndex++).shallowCopyTo(previousVector); @@ -1030,7 +1059,7 @@ public DoubleStreamReader build() throws IOException { boolean isFileCompressed = compressionCodec != null; // TODO: why doesn't this use context? - return new DoubleStreamReader(columnIndex, present, data, isFileCompressed, vectors); + return new DoubleStreamReader(columnIndex, present, data, isFileCompressed, vectors, context); } public StreamReaderBuilder setVectors(List vectors) { @@ -1068,7 +1097,7 @@ private DecimalStreamReader(int columnId, int precision, int scale, } @Override - public void seek(PositionProvider index) throws IOException { + public void seek(PositionProvider index, ReadPhase readPhase) throws IOException { if (vectors != null) return; if (present != null) { if (_isFileCompressed) { @@ -1095,10 +1124,13 @@ public void seek(PositionProvider index) throws IOException { } @Override - public void nextVector( - ColumnVector previousVector, boolean[] isNull, int batchSize) throws IOException { + public void nextVector(ColumnVector previousVector, + boolean[] isNull, + final int batchSize, + FilterContext filterContext, + ReadPhase readPhase) throws IOException { if (vectors == null) { - super.nextVector(previousVector, isNull, batchSize); + super.nextVector(previousVector, isNull, batchSize, filterContext, readPhase); return; } vectors.get(vectorIndex++).shallowCopyTo(previousVector); @@ -1229,7 +1261,7 @@ private Decimal64StreamReader(int columnId, int precision, int scale, } @Override - public void seek(PositionProvider index) throws IOException { + public void seek(PositionProvider index, ReadPhase readPhase) throws IOException { if (vectors != null) return; if (present != null) { if (_isFileCompressed) { @@ -1249,10 +1281,13 @@ public void seek(PositionProvider index) throws IOException { } @Override - public void nextVector( - ColumnVector previousVector, boolean[] isNull, int batchSize) throws IOException { + public void nextVector(ColumnVector previousVector, + boolean[] isNull, + final int batchSize, + FilterContext filterContext, + ReadPhase readPhase) throws IOException { if (vectors == null) { - super.nextVector(previousVector, isNull, batchSize); + super.nextVector(previousVector, isNull, batchSize, filterContext, readPhase); return; } vectors.get(vectorIndex++).shallowCopyTo(previousVector); @@ -1367,7 +1402,7 @@ private DateStreamReader(int columnId, SettableUncompressedStream present, } @Override - public void seek(PositionProvider index) throws IOException { + public void seek(PositionProvider index, ReadPhase readPhase) throws IOException { if (vectors != null) return; if (present != null) { if (isFileCompressed) { @@ -1387,10 +1422,13 @@ public void seek(PositionProvider index) throws IOException { } @Override - public void nextVector( - ColumnVector previousVector, boolean[] isNull, int batchSize) throws IOException { + public void nextVector(ColumnVector previousVector, + boolean[] isNull, + final int batchSize, + FilterContext filterContext, + ReadPhase readPhase) throws IOException { if (vectors == null) { - super.nextVector(previousVector, isNull, batchSize); + super.nextVector(previousVector, isNull, batchSize, filterContext, readPhase); return; } vectors.get(vectorIndex++).shallowCopyTo(previousVector); @@ -1491,9 +1529,9 @@ private CharStreamReader(int columnId, int maxLength, SettableUncompressedStream present, SettableUncompressedStream data, SettableUncompressedStream length, SettableUncompressedStream dictionary, boolean isFileCompressed, OrcProto.ColumnEncoding encoding, - List vectors) throws IOException { + List vectors, TreeReaderFactory.Context context) throws IOException { super(columnId, maxLength, present, data, length, - dictionary, encoding); + dictionary, encoding, context); this._isDictionaryEncoding = dictionary != null; this._isFileCompressed = isFileCompressed; this._presentStream = present; @@ -1504,19 +1542,19 @@ private CharStreamReader(int columnId, int maxLength, } @Override - public void seek(PositionProvider[] index) throws IOException { + public void seek(PositionProvider[] index, ReadPhase readPhase) throws IOException { // This string reader should simply redirect to its own seek (what other types already do). - this.seek(index[columnId]); + this.seek(index[columnId], readPhase); } @Override - public void seek(PositionProvider index) throws IOException { + public void seek(PositionProvider index, ReadPhase readPhase) throws IOException { if (vectors != null) return; if (present != null) { if (_isFileCompressed) { index.getNext(); } - reader.getPresent().seek(index); + reader.seek(index, readPhase); } if (_isDictionaryEncoding) { @@ -1551,10 +1589,13 @@ public void seek(PositionProvider index) throws IOException { } @Override - public void nextVector( - ColumnVector previousVector, boolean[] isNull, int batchSize) throws IOException { + public void nextVector(ColumnVector previousVector, + boolean[] isNull, + final int batchSize, + FilterContext filterContext, + ReadPhase readPhase) throws IOException { if (vectors == null) { - super.nextVector(previousVector, isNull, batchSize); + super.nextVector(previousVector, isNull, batchSize, filterContext, readPhase); return; } vectors.get(vectorIndex++).shallowCopyTo(previousVector); @@ -1601,6 +1642,7 @@ public static class StreamReaderBuilder { private CompressionCodec compressionCodec; private OrcProto.ColumnEncoding columnEncoding; private List vectors; + private TreeReaderFactory.Context context; public StreamReaderBuilder setColumnIndex(int columnIndex) { this.columnIndex = columnIndex; @@ -1660,7 +1702,12 @@ public CharStreamReader build() throws IOException { boolean isFileCompressed = compressionCodec != null; return new CharStreamReader(columnIndex, maxLength, present, data, length, - dictionary, isFileCompressed, columnEncoding, vectors); + dictionary, isFileCompressed, columnEncoding, vectors, context); + } + + public StreamReaderBuilder setContext(TreeReaderFactory.Context context) { + this.context = context; + return this; } public StreamReaderBuilder setVectors(List vectors) { @@ -1683,15 +1730,15 @@ protected static class VarcharStreamReader extends VarcharTreeReader implements private SettableUncompressedStream _lengthStream; private SettableUncompressedStream _dictionaryStream; private List vectors; - private int vectorIndex = 0; + private int vectorIndex = 0; private VarcharStreamReader(int columnId, int maxLength, SettableUncompressedStream present, SettableUncompressedStream data, SettableUncompressedStream length, SettableUncompressedStream dictionary, boolean isFileCompressed, OrcProto.ColumnEncoding encoding, - List vectors) throws IOException { + List vectors, TreeReaderFactory.Context context) throws IOException { super(columnId, maxLength, present, data, length, - dictionary, encoding); + dictionary, encoding, context); this._isDictionaryEncoding = dictionary != null; this._isFileCompressed = isFileCompressed; this._presentStream = present; @@ -1702,19 +1749,19 @@ private VarcharStreamReader(int columnId, int maxLength, } @Override - public void seek(PositionProvider[] index) throws IOException { + public void seek(PositionProvider[] index, ReadPhase readPhase) throws IOException { // This string reader should simply redirect to its own seek (what other types already do). - this.seek(index[columnId]); + this.seek(index[columnId], readPhase); } @Override - public void seek(PositionProvider index) throws IOException { + public void seek(PositionProvider index, ReadPhase readPhase) throws IOException { if (vectors != null) return; if (present != null) { if (_isFileCompressed) { index.getNext(); } - reader.getPresent().seek(index); + reader.seek(index, readPhase); } if (_isDictionaryEncoding) { @@ -1749,10 +1796,13 @@ public void seek(PositionProvider index) throws IOException { } @Override - public void nextVector( - ColumnVector previousVector, boolean[] isNull, int batchSize) throws IOException { + public void nextVector(ColumnVector previousVector, + boolean[] isNull, + final int batchSize, + FilterContext filterContext, + ReadPhase readPhase) throws IOException { if (vectors == null) { - super.nextVector(previousVector, isNull, batchSize); + super.nextVector(previousVector, isNull, batchSize, filterContext, readPhase); return; } vectors.get(vectorIndex++).shallowCopyTo(previousVector); @@ -1799,6 +1849,7 @@ public static class StreamReaderBuilder { private CompressionCodec compressionCodec; private OrcProto.ColumnEncoding columnEncoding; private List vectors; + private TreeReaderFactory.Context context; public StreamReaderBuilder setColumnIndex(int columnIndex) { this.columnIndex = columnIndex; @@ -1858,7 +1909,12 @@ public VarcharStreamReader build() throws IOException { boolean isFileCompressed = compressionCodec != null; return new VarcharStreamReader(columnIndex, maxLength, present, data, length, - dictionary, isFileCompressed, columnEncoding, vectors); + dictionary, isFileCompressed, columnEncoding, vectors, context); + } + + public StreamReaderBuilder setContext(TreeReaderFactory.Context context) { + this.context = context; + return this; } public StreamReaderBuilder setVectors(List vectors) { @@ -1882,8 +1938,8 @@ protected static class ByteStreamReader extends ByteTreeReader implements Settab private ByteStreamReader(int columnId, SettableUncompressedStream present, SettableUncompressedStream data, boolean isFileCompressed, - List vectors) throws IOException { - super(columnId, present, data); + List vectors, TreeReaderFactory.Context context) throws IOException { + super(columnId, present, data, context); this._isFileCompressed = isFileCompressed; this._presentStream = present; this._dataStream = data; @@ -1891,7 +1947,7 @@ private ByteStreamReader(int columnId, SettableUncompressedStream present, } @Override - public void seek(PositionProvider index) throws IOException { + public void seek(PositionProvider index, ReadPhase readPhase) throws IOException { if (vectors != null) return; if (present != null) { if (_isFileCompressed) { @@ -1911,10 +1967,13 @@ public void seek(PositionProvider index) throws IOException { } @Override - public void nextVector( - ColumnVector previousVector, boolean[] isNull, int batchSize) throws IOException { + public void nextVector(ColumnVector previousVector, + boolean[] isNull, + final int batchSize, + FilterContext filterContext, + ReadPhase readPhase) throws IOException { if (vectors == null) { - super.nextVector(previousVector, isNull, batchSize); + super.nextVector(previousVector, isNull, batchSize, filterContext, readPhase); return; } vectors.get(vectorIndex++).shallowCopyTo(previousVector); @@ -1942,6 +2001,7 @@ public static class StreamReaderBuilder { private ColumnStreamData dataStream; private CompressionCodec compressionCodec; private List vectors; + private TreeReaderFactory.Context context; public StreamReaderBuilder setColumnIndex(int columnIndex) { this.columnIndex = columnIndex; @@ -1973,7 +2033,12 @@ public ByteStreamReader build() throws IOException { dataStream); boolean isFileCompressed = compressionCodec != null; - return new ByteStreamReader(columnIndex, present, data, isFileCompressed, vectors); + return new ByteStreamReader(columnIndex, present, data, isFileCompressed, vectors, context); + } + + public StreamReaderBuilder setContext(TreeReaderFactory.Context context) { + this.context = context; + return this; } public StreamReaderBuilder setVectors(List vectors) { @@ -2008,10 +2073,13 @@ private BinaryStreamReader(int columnId, SettableUncompressedStream present, } @Override - public void nextVector( - ColumnVector previousVector, boolean[] isNull, int batchSize) throws IOException { + public void nextVector(ColumnVector previousVector, + boolean[] isNull, + final int batchSize, + FilterContext filterContext, + ReadPhase readPhase) throws IOException { if (vectors == null) { - super.nextVector(previousVector, isNull, batchSize); + super.nextVector(previousVector, isNull, batchSize, filterContext, readPhase); return; } vectors.get(vectorIndex++).shallowCopyTo(previousVector); @@ -2021,7 +2089,7 @@ public void nextVector( } @Override - public void seek(PositionProvider index) throws IOException { + public void seek(PositionProvider index, ReadPhase readPhase) throws IOException { if (vectors != null) return; if (present != null) { if (_isFileCompressed) { @@ -2143,8 +2211,8 @@ protected static class BooleanStreamReader extends BooleanTreeReader implements private BooleanStreamReader(int columnId, SettableUncompressedStream present, SettableUncompressedStream data, boolean isFileCompressed, - List vectors) throws IOException { - super(columnId, present, data); + List vectors, TreeReaderFactory.Context context) throws IOException { + super(columnId, present, data, context); this._isFileCompressed = isFileCompressed; this._presentStream = present; this._dataStream = data; @@ -2152,7 +2220,7 @@ private BooleanStreamReader(int columnId, SettableUncompressedStream present, } @Override - public void seek(PositionProvider index) throws IOException { + public void seek(PositionProvider index, ReadPhase readPhase) throws IOException { if (vectors != null) return; if (present != null) { if (_isFileCompressed) { @@ -2172,10 +2240,13 @@ public void seek(PositionProvider index) throws IOException { } @Override - public void nextVector( - ColumnVector previousVector, boolean[] isNull, int batchSize) throws IOException { + public void nextVector(ColumnVector previousVector, + boolean[] isNull, + final int batchSize, + FilterContext filterContext, + ReadPhase readPhase) throws IOException { if (vectors == null) { - super.nextVector(previousVector, isNull, batchSize); + super.nextVector(previousVector, isNull, batchSize, filterContext, readPhase); return; } vectors.get(vectorIndex++).shallowCopyTo(previousVector); @@ -2203,6 +2274,7 @@ public static class StreamReaderBuilder { private ColumnStreamData dataStream; private CompressionCodec compressionCodec; private List vectors; + private TreeReaderFactory.Context context; public StreamReaderBuilder setColumnIndex(int columnIndex) { this.columnIndex = columnIndex; @@ -2234,7 +2306,12 @@ public BooleanStreamReader build() throws IOException { dataStream); boolean isFileCompressed = compressionCodec != null; - return new BooleanStreamReader(columnIndex, present, data, isFileCompressed, vectors); + return new BooleanStreamReader(columnIndex, present, data, isFileCompressed, vectors, context); + } + + public StreamReaderBuilder setContext(TreeReaderFactory.Context context) { + this.context = context; + return this; } public StreamReaderBuilder setVectors(List vectors) { @@ -2445,6 +2522,7 @@ private static TreeReader getPrimitiveTreeReader(final int columnIndex, .setDataStream(data) .setCompressionCodec(codec) .setVectors(vectors) + .setContext(context) .build(); case BYTE: return ByteStreamReader.builder() @@ -2453,6 +2531,7 @@ private static TreeReader getPrimitiveTreeReader(final int columnIndex, .setDataStream(data) .setCompressionCodec(codec) .setVectors(vectors) + .setContext(context) .build(); case SHORT: return ShortStreamReader.builder() @@ -2491,6 +2570,7 @@ private static TreeReader getPrimitiveTreeReader(final int columnIndex, .setDataStream(data) .setCompressionCodec(codec) .setVectors(vectors) + .setContext(context) .build(); case DOUBLE: return DoubleStreamReader.builder() @@ -2499,6 +2579,7 @@ private static TreeReader getPrimitiveTreeReader(final int columnIndex, .setDataStream(data) .setCompressionCodec(codec) .setVectors(vectors) + .setContext(context) .build(); case CHAR: return CharStreamReader.builder() @@ -2511,6 +2592,7 @@ private static TreeReader getPrimitiveTreeReader(final int columnIndex, .setCompressionCodec(codec) .setColumnEncoding(columnEncoding) .setVectors(vectors) + .setContext(context) .build(); case VARCHAR: return VarcharStreamReader.builder() @@ -2523,6 +2605,7 @@ private static TreeReader getPrimitiveTreeReader(final int columnIndex, .setCompressionCodec(codec) .setColumnEncoding(columnEncoding) .setVectors(vectors) + .setContext(context) .build(); case STRING: return StringStreamReader.builder() @@ -2534,6 +2617,7 @@ private static TreeReader getPrimitiveTreeReader(final int columnIndex, .setCompressionCodec(codec) .setColumnEncoding(columnEncoding) .setVectors(vectors) + .setContext(context) .build(); case DECIMAL: // special handling for serde reader (text) in llap IO. @@ -2622,7 +2706,7 @@ public ListStreamReader(final int columnIndex, } @Override - public void seek(PositionProvider[] index) throws IOException { + public void seek(PositionProvider[] index, ReadPhase readPhase) throws IOException { PositionProvider ownIndex = index[columnId]; if (present != null) { if (_isFileCompressed) { @@ -2638,12 +2722,12 @@ public void seek(PositionProvider[] index) throws IOException { ownIndex.getNext(); } lengths.seek(ownIndex); - elementReader.seek(index); + elementReader.seek(index, readPhase); } } @Override - public void seek(PositionProvider index) throws IOException { + public void seek(PositionProvider index, ReadPhase readPhase) throws IOException { // Only our parent class can call this. throw new IOException("Should never be called"); } @@ -2747,7 +2831,7 @@ public MapStreamReader(final int columnIndex, } @Override - public void seek(PositionProvider[] index) throws IOException { + public void seek(PositionProvider[] index, ReadPhase readPhase) throws IOException { // We are not calling super.seek since we handle the present stream differently. PositionProvider ownIndex = index[columnId]; if (present != null) { @@ -2764,13 +2848,13 @@ public void seek(PositionProvider[] index) throws IOException { ownIndex.getNext(); } lengths.seek(ownIndex); - keyReader.seek(index); - valueReader.seek(index); + keyReader.seek(index, readPhase); + valueReader.seek(index, readPhase); } } @Override - public void seek(PositionProvider index) throws IOException { + public void seek(PositionProvider index, ReadPhase readPhase) throws IOException { // Only our parent class can call this. throw new IOException("Should never be called"); } @@ -2882,7 +2966,7 @@ public StructStreamReader(final int columnIndex, } @Override - public void seek(PositionProvider[] index) throws IOException { + public void seek(PositionProvider[] index, ReadPhase readPhase) throws IOException { PositionProvider ownIndex = index[columnId]; if (present != null) { if (_isFileCompressed) { @@ -2891,16 +2975,16 @@ public void seek(PositionProvider[] index) throws IOException { present.seek(ownIndex); } if (fields != null) { - for (TreeReader child : fields) { + for (TypeReader child : fields) { if (child != null) { - child.seek(index); + child.seek(index, readPhase); } } } } @Override - public void seek(PositionProvider index) throws IOException { + public void seek(PositionProvider index, ReadPhase readPhase) throws IOException { // Only our parent class can call this. throw new IOException("Should never be called"); } @@ -2915,7 +2999,7 @@ public void setBuffers(EncodedColumnBatch batch, boolean sameStripe StreamUtils.createDiskRangeInfo(streamsData[OrcProto.Stream.Kind.PRESENT_VALUE])); } if (fields != null) { - for (TreeReader child : fields) { + for (TypeReader child : fields) { if (child != null) { ((SettableTreeReader) child).setBuffers(batch, sameStripe); } @@ -2995,7 +3079,7 @@ public UnionStreamReader(final int columnIndex, } @Override - public void seek(PositionProvider[] index) throws IOException { + public void seek(PositionProvider[] index, ReadPhase readPhase) throws IOException { PositionProvider ownIndex = index[columnId]; if (present != null) { if (_isFileCompressed) { @@ -3012,15 +3096,15 @@ public void seek(PositionProvider[] index) throws IOException { } tags.seek(ownIndex); if (fields != null) { - for (TreeReader child : fields) { - child.seek(index); + for (TypeReader child : fields) { + child.seek(index, readPhase); } } } } @Override - public void seek(PositionProvider index) throws IOException { + public void seek(PositionProvider index, ReadPhase readPhase) throws IOException { // Only our parent class can call this. throw new IOException("Should never be called"); } @@ -3038,7 +3122,7 @@ public void setBuffers(EncodedColumnBatch batch, boolean sameStripe StreamUtils.createDiskRangeInfo(streamsData[OrcProto.Stream.Kind.DATA_VALUE])); } if (fields != null) { - for (TreeReader child : fields) { + for (TypeReader child : fields) { ((SettableTreeReader) child).setBuffers(batch, sameStripe); } } diff --git a/standalone-metastore/pom.xml b/standalone-metastore/pom.xml index 9b3d3a3dff30..c21b0c3f95dc 100644 --- a/standalone-metastore/pom.xml +++ b/standalone-metastore/pom.xml @@ -93,7 +93,7 @@ 0.14.1 2.13.2 3.3.3 - 1.6.9 + 1.7.2 com.google.protobuf 2.6.1