From b206ff1e0da54457995e05a3c0afad62d3c97e96 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Fri, 24 Jan 2025 14:26:07 +0800 Subject: [PATCH 1/4] [core] Refactory ColumnarRowIterator using LongIterator --- .../data/columnar/ColumnarRowIterator.java | 26 ++++++++++++------- .../data/columnar/VectorizedRowIterator.java | 4 +-- 2 files changed, 19 insertions(+), 11 deletions(-) diff --git a/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRowIterator.java b/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRowIterator.java index c1ed028acdbe..2e34b1d66f92 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRowIterator.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRowIterator.java @@ -42,7 +42,9 @@ public class ColumnarRowIterator extends RecyclableIterator protected int num; protected int nextPos; - protected long[] positions; + protected int callReturnedPositionTimes; + protected long returnedPosition; + protected LongIterator positionIterator; public ColumnarRowIterator(Path filePath, ColumnarRow row, @Nullable Runnable recycler) { super(recycler); @@ -56,12 +58,11 @@ public void reset(long nextFilePos) { } public void reset(LongIterator positions) { + this.positionIterator = positions; this.num = row.batch().getNumRows(); - this.positions = new long[num]; - for (int i = 0; i < num; i++) { - this.positions[i] = positions.next(); - } this.nextPos = 0; + this.callReturnedPositionTimes = 0; + this.returnedPosition = -1; } @Nullable @@ -77,10 +78,15 @@ public InternalRow next() { @Override public long returnedPosition() { - if (nextPos == 0) { - return positions[0] - 1; + for (int i = 0; i < nextPos - callReturnedPositionTimes; i++) { + returnedPosition = positionIterator.next(); + } + callReturnedPositionTimes = nextPos; + if (returnedPosition == -1) { + throw new IllegalStateException("returnedPosition() is called before next()"); } - return positions[nextPos - 1]; + + return returnedPosition; } @Override @@ -89,9 +95,11 @@ public Path filePath() { } protected ColumnarRowIterator copy(ColumnVector[] vectors) { + // We should call copy only when the iterator is at the beginning of the file. + assert nextPos == 0; ColumnarRowIterator newIterator = new ColumnarRowIterator(filePath, row.copy(vectors), recycler); - newIterator.reset(LongIterator.fromArray(positions)); + newIterator.reset(positionIterator); return newIterator; } diff --git a/paimon-common/src/main/java/org/apache/paimon/data/columnar/VectorizedRowIterator.java b/paimon-common/src/main/java/org/apache/paimon/data/columnar/VectorizedRowIterator.java index fa9b1ded8412..060fedda81b7 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/columnar/VectorizedRowIterator.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/columnar/VectorizedRowIterator.java @@ -20,7 +20,6 @@ import org.apache.paimon.fs.Path; import org.apache.paimon.reader.VectorizedRecordIterator; -import org.apache.paimon.utils.LongIterator; import javax.annotation.Nullable; @@ -38,9 +37,10 @@ public VectorizedColumnBatch batch() { @Override protected VectorizedRowIterator copy(ColumnVector[] vectors) { + assert nextPos == 0; VectorizedRowIterator newIterator = new VectorizedRowIterator(filePath, row.copy(vectors), recycler); - newIterator.reset(LongIterator.fromArray(positions)); + newIterator.reset(positionIterator); return newIterator; } } From 812b354379a0fb38bc2bd402e99d3c50dd7f451a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Fri, 24 Jan 2025 14:29:28 +0800 Subject: [PATCH 2/4] fix arrow convertor mis use --- .../ArrowVectorizedBatchConverter.java | 37 +++++++++++-------- 1 file changed, 22 insertions(+), 15 deletions(-) diff --git a/paimon-arrow/src/main/java/org/apache/paimon/arrow/converter/ArrowVectorizedBatchConverter.java b/paimon-arrow/src/main/java/org/apache/paimon/arrow/converter/ArrowVectorizedBatchConverter.java index 8e12d56dcdee..cc7c6dc3014c 100644 --- a/paimon-arrow/src/main/java/org/apache/paimon/arrow/converter/ArrowVectorizedBatchConverter.java +++ b/paimon-arrow/src/main/java/org/apache/paimon/arrow/converter/ArrowVectorizedBatchConverter.java @@ -32,6 +32,8 @@ import javax.annotation.Nullable; +import java.io.IOException; + /** To convert {@link VectorizedColumnBatch} to Arrow format. */ public class ArrowVectorizedBatchConverter extends ArrowBatchConverter { @@ -65,22 +67,27 @@ public void reset(ApplyDeletionFileRecordIterator iterator) { FileRecordIterator innerIterator = iterator.iterator(); this.batch = ((VectorizedRecordIterator) innerIterator).batch(); - long firstReturnedPosition = innerIterator.returnedPosition() + 1; - DeletionVector deletionVector = iterator.deletionVector(); - int originNumRows = this.batch.getNumRows(); - IntArrayList picked = new IntArrayList(originNumRows); - for (int i = 0; i < originNumRows; i++) { - long returnedPosition = firstReturnedPosition + i; - if (!deletionVector.isDeleted(returnedPosition)) { - picked.add(i); + try { + DeletionVector deletionVector = iterator.deletionVector(); + int originNumRows = this.batch.getNumRows(); + IntArrayList picked = new IntArrayList(originNumRows); + for (int i = 0; i < originNumRows; i++) { + innerIterator.next(); + long returnedPosition = innerIterator.returnedPosition(); + if (!deletionVector.isDeleted(returnedPosition)) { + picked.add(i); + } } - } - if (picked.size() == originNumRows) { - this.pickedInColumn = null; - this.totalNumRows = originNumRows; - } else { - this.pickedInColumn = picked.toArray(); - this.totalNumRows = this.pickedInColumn.length; + + if (picked.size() == originNumRows) { + this.pickedInColumn = null; + this.totalNumRows = originNumRows; + } else { + this.pickedInColumn = picked.toArray(); + this.totalNumRows = this.pickedInColumn.length; + } + } catch (IOException e) { + throw new RuntimeException("Failed to apply deletion vector.", e); } this.startIndex = 0; From 365c8ee3317a7aa4b395eb7e55d9b2f2b65508c5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Fri, 24 Jan 2025 14:55:30 +0800 Subject: [PATCH 3/4] Fix comment --- .../data/columnar/ColumnarRowIterator.java | 20 +++--- .../data/columnar/VectorizedRowIterator.java | 4 +- .../columnar/ColumnarRowIteratorTest.java | 66 +++++++++++++++++++ .../heap/RowColumnVectorTest.java | 5 +- 4 files changed, 81 insertions(+), 14 deletions(-) create mode 100644 paimon-common/src/test/java/org/apache/paimon/data/columnar/ColumnarRowIteratorTest.java rename paimon-common/src/test/java/org/apache/paimon/data/{calumnar => columnar}/heap/RowColumnVectorTest.java (90%) diff --git a/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRowIterator.java b/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRowIterator.java index 2e34b1d66f92..02bfe5912d3f 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRowIterator.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRowIterator.java @@ -29,6 +29,8 @@ import javax.annotation.Nullable; +import static org.apache.paimon.utils.Preconditions.checkArgument; + /** * A {@link RecordReader.RecordIterator} that returns {@link InternalRow}s. The next row is set by * {@link ColumnarRow#setRowId}. @@ -41,8 +43,8 @@ public class ColumnarRowIterator extends RecyclableIterator protected final Runnable recycler; protected int num; - protected int nextPos; - protected int callReturnedPositionTimes; + protected int index; + protected int returnedPositionIndex; protected long returnedPosition; protected LongIterator positionIterator; @@ -60,16 +62,16 @@ public void reset(long nextFilePos) { public void reset(LongIterator positions) { this.positionIterator = positions; this.num = row.batch().getNumRows(); - this.nextPos = 0; - this.callReturnedPositionTimes = 0; + this.index = 0; + this.returnedPositionIndex = 0; this.returnedPosition = -1; } @Nullable @Override public InternalRow next() { - if (nextPos < num) { - row.setRowId(nextPos++); + if (index < num) { + row.setRowId(index++); return row; } else { return null; @@ -78,10 +80,10 @@ public InternalRow next() { @Override public long returnedPosition() { - for (int i = 0; i < nextPos - callReturnedPositionTimes; i++) { + for (int i = 0; i < index - returnedPositionIndex; i++) { returnedPosition = positionIterator.next(); } - callReturnedPositionTimes = nextPos; + returnedPositionIndex = index; if (returnedPosition == -1) { throw new IllegalStateException("returnedPosition() is called before next()"); } @@ -96,7 +98,7 @@ public Path filePath() { protected ColumnarRowIterator copy(ColumnVector[] vectors) { // We should call copy only when the iterator is at the beginning of the file. - assert nextPos == 0; + checkArgument(returnedPositionIndex == 0, "copy() should not be called after next()"); ColumnarRowIterator newIterator = new ColumnarRowIterator(filePath, row.copy(vectors), recycler); newIterator.reset(positionIterator); diff --git a/paimon-common/src/main/java/org/apache/paimon/data/columnar/VectorizedRowIterator.java b/paimon-common/src/main/java/org/apache/paimon/data/columnar/VectorizedRowIterator.java index 060fedda81b7..dfec00a39b72 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/columnar/VectorizedRowIterator.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/columnar/VectorizedRowIterator.java @@ -23,6 +23,8 @@ import javax.annotation.Nullable; +import static org.apache.paimon.utils.Preconditions.checkArgument; + /** A {@link ColumnarRowIterator} with {@link VectorizedRecordIterator}. */ public class VectorizedRowIterator extends ColumnarRowIterator implements VectorizedRecordIterator { @@ -37,7 +39,7 @@ public VectorizedColumnBatch batch() { @Override protected VectorizedRowIterator copy(ColumnVector[] vectors) { - assert nextPos == 0; + checkArgument(returnedPositionIndex == 0, "copy() should not be called after next()"); VectorizedRowIterator newIterator = new VectorizedRowIterator(filePath, row.copy(vectors), recycler); newIterator.reset(positionIterator); diff --git a/paimon-common/src/test/java/org/apache/paimon/data/columnar/ColumnarRowIteratorTest.java b/paimon-common/src/test/java/org/apache/paimon/data/columnar/ColumnarRowIteratorTest.java new file mode 100644 index 000000000000..50f244f5c220 --- /dev/null +++ b/paimon-common/src/test/java/org/apache/paimon/data/columnar/ColumnarRowIteratorTest.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.data.columnar; + +import org.apache.paimon.data.columnar.heap.HeapIntVector; +import org.apache.paimon.fs.Path; +import org.apache.paimon.utils.LongIterator; + +import org.junit.jupiter.api.Test; + +import java.util.Random; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatCode; + +/** Test for {@link ColumnarRowIterator}. */ +public class ColumnarRowIteratorTest { + + @Test + public void testRowIterator() { + Random random = new Random(); + HeapIntVector heapIntVector = new HeapIntVector(100); + for (int i = 0; i < 100; i++) { + heapIntVector.setInt(i, random.nextInt()); + } + long[] positions = new long[100]; + positions[0] = random.nextInt(10); + for (int i = 1; i < 100; i++) { + positions[i] = positions[i - 1] + random.nextInt(100); + } + + VectorizedColumnBatch vectorizedColumnBatch = + new VectorizedColumnBatch(new ColumnVector[] {heapIntVector}); + vectorizedColumnBatch.setNumRows(100); + ColumnarRowIterator rowIterator = + new ColumnarRowIterator( + new Path("test"), new ColumnarRow(vectorizedColumnBatch), null); + rowIterator.reset(LongIterator.fromArray(positions)); + assertThatCode(rowIterator::returnedPosition) + .hasMessage("returnedPosition() is called before next()"); + rowIterator.next(); + for (int i = 0; i < random.nextInt(10); i++) { + for (int j = 0; j < random.nextInt(9); j++) { + rowIterator.next(); + } + assertThat(rowIterator.returnedPosition()) + .isEqualTo(positions[rowIterator.index - 1]); + } + } +} diff --git a/paimon-common/src/test/java/org/apache/paimon/data/calumnar/heap/RowColumnVectorTest.java b/paimon-common/src/test/java/org/apache/paimon/data/columnar/heap/RowColumnVectorTest.java similarity index 90% rename from paimon-common/src/test/java/org/apache/paimon/data/calumnar/heap/RowColumnVectorTest.java rename to paimon-common/src/test/java/org/apache/paimon/data/columnar/heap/RowColumnVectorTest.java index bc7c127a634d..97b00f590bf0 100644 --- a/paimon-common/src/test/java/org/apache/paimon/data/calumnar/heap/RowColumnVectorTest.java +++ b/paimon-common/src/test/java/org/apache/paimon/data/columnar/heap/RowColumnVectorTest.java @@ -16,13 +16,10 @@ * limitations under the License. */ -package org.apache.paimon.data.calumnar.heap; +package org.apache.paimon.data.columnar.heap; import org.apache.paimon.data.InternalRow; import org.apache.paimon.data.columnar.ColumnVector; -import org.apache.paimon.data.columnar.heap.CastedRowColumnVector; -import org.apache.paimon.data.columnar.heap.HeapIntVector; -import org.apache.paimon.data.columnar.heap.HeapRowVector; import org.junit.jupiter.api.Test; From f7852ea72cf0be1bd75483a20756b971ef558a7f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Fri, 24 Jan 2025 15:01:40 +0800 Subject: [PATCH 4/4] fix style --- .../apache/paimon/data/columnar/ColumnarRowIteratorTest.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/paimon-common/src/test/java/org/apache/paimon/data/columnar/ColumnarRowIteratorTest.java b/paimon-common/src/test/java/org/apache/paimon/data/columnar/ColumnarRowIteratorTest.java index 50f244f5c220..8ab926a193cd 100644 --- a/paimon-common/src/test/java/org/apache/paimon/data/columnar/ColumnarRowIteratorTest.java +++ b/paimon-common/src/test/java/org/apache/paimon/data/columnar/ColumnarRowIteratorTest.java @@ -59,8 +59,7 @@ public void testRowIterator() { for (int j = 0; j < random.nextInt(9); j++) { rowIterator.next(); } - assertThat(rowIterator.returnedPosition()) - .isEqualTo(positions[rowIterator.index - 1]); + assertThat(rowIterator.returnedPosition()).isEqualTo(positions[rowIterator.index - 1]); } } }