From 87070aa9d5d8aa5b007358e2abb4b8e9300b0222 Mon Sep 17 00:00:00 2001 From: yuzelin Date: Wed, 22 Jan 2025 17:08:35 +0800 Subject: [PATCH 1/2] [core] Throw exception if increment query with rescale bucket --- .../table/source/AbstractDataTableScan.java | 20 ++++++++++- .../IncrementalTagStartingScanner.java | 14 +++++++- .../IncrementalTimeStampStartingScanner.java | 10 ++++++ .../table/source/snapshot/TimeTravelUtil.java | 24 +++++++++++-- .../paimon/flink/BatchFileStoreITCase.java | 34 +++++++++++++++++++ 5 files changed, 98 insertions(+), 4 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java index 59b11281cc78..a176bd56cae5 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java @@ -28,6 +28,7 @@ import org.apache.paimon.metrics.MetricRegistry; import org.apache.paimon.operation.FileStoreScan; import org.apache.paimon.options.Options; +import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.table.source.snapshot.CompactedStartingScanner; import org.apache.paimon.table.source.snapshot.ContinuousCompactorStartingScanner; import org.apache.paimon.table.source.snapshot.ContinuousFromSnapshotFullStartingScanner; @@ -46,6 +47,7 @@ import org.apache.paimon.table.source.snapshot.StaticFromTagStartingScanner; import org.apache.paimon.table.source.snapshot.StaticFromTimestampStartingScanner; import org.apache.paimon.table.source.snapshot.StaticFromWatermarkStartingScanner; +import org.apache.paimon.table.source.snapshot.TimeTravelUtil; import org.apache.paimon.tag.Tag; import org.apache.paimon.utils.Filter; import org.apache.paimon.utils.Pair; @@ -245,7 +247,15 @@ private StartingScanner createIncrementalStartingScanner(SnapshotManager snapsho Options conf = options.toConfiguration(); TagManager tagManager = - new TagManager(snapshotManager.fileIO(), snapshotManager.tablePath()); + new TagManager( + snapshotManager.fileIO(), + snapshotManager.tablePath(), + snapshotManager.branch()); + SchemaManager schemaManager = + new SchemaManager( + snapshotManager.fileIO(), + snapshotManager.tablePath(), + snapshotManager.branch()); if (conf.contains(CoreOptions.INCREMENTAL_BETWEEN)) { Pair incrementalBetween = options.incrementalBetween(); Optional startTag = tagManager.get(incrementalBetween.getLeft()); @@ -269,6 +279,9 @@ private StartingScanner createIncrementalStartingScanner(SnapshotManager snapsho incrementalBetween.getLeft(), start.id())); } + + TimeTravelUtil.checkRescaleBucketForIncrementalQuery( + schemaManager, start.schemaId(), end.schemaId()); return new IncrementalTagStartingScanner(snapshotManager, start, end); } else { long startId, endId; @@ -282,6 +295,11 @@ private StartingScanner createIncrementalStartingScanner(SnapshotManager snapsho + "Please set two tags or two snapshot Ids.", incrementalBetween.getLeft(), incrementalBetween.getRight())); } + + TimeTravelUtil.checkRescaleBucketForIncrementalQuery( + schemaManager, + snapshotManager.snapshot(startId).schemaId(), + snapshotManager.snapshot(endId).schemaId()); return new IncrementalStartingScanner(snapshotManager, startId, endId, scanMode); } } else if (conf.contains(CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP)) { diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTagStartingScanner.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTagStartingScanner.java index 8c4b5d5cec01..d2f3e672ff2c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTagStartingScanner.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTagStartingScanner.java @@ -20,6 +20,7 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.Snapshot; +import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.tag.Tag; import org.apache.paimon.tag.TagPeriodHandler; import org.apache.paimon.utils.Pair; @@ -66,7 +67,10 @@ public static AbstractStartingScanner create( endTagName); TagManager tagManager = - new TagManager(snapshotManager.fileIO(), snapshotManager.tablePath()); + new TagManager( + snapshotManager.fileIO(), + snapshotManager.tablePath(), + snapshotManager.branch()); Optional endTag = tagManager.get(endTagName); if (!endTag.isPresent()) { @@ -92,6 +96,14 @@ public static AbstractStartingScanner create( LOG.info("Found start tag {} .", periodHandler.timeToTag(previousTags.get(0).getRight())); Snapshot start = previousTags.get(0).getLeft().trimToSnapshot(); + TimeTravelUtil.checkRescaleBucketForIncrementalQuery( + new SchemaManager( + snapshotManager.fileIO(), + snapshotManager.tablePath(), + snapshotManager.branch()), + start.schemaId(), + end.schemaId()); + return new IncrementalTagStartingScanner(snapshotManager, start, end); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTimeStampStartingScanner.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTimeStampStartingScanner.java index bccb0b3d3aa5..dc2e1bbd0c13 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTimeStampStartingScanner.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTimeStampStartingScanner.java @@ -19,6 +19,7 @@ package org.apache.paimon.table.source.snapshot; import org.apache.paimon.Snapshot; +import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.table.source.ScanMode; import org.apache.paimon.utils.SnapshotManager; @@ -64,6 +65,15 @@ public Result scan(SnapshotReader reader) { : startingSnapshotId; Snapshot endSnapshot = snapshotManager.earlierOrEqualTimeMills(endTimestamp); Long endSnapshotId = (endSnapshot == null) ? latestSnapshot.id() : endSnapshot.id(); + + TimeTravelUtil.checkRescaleBucketForIncrementalQuery( + new SchemaManager( + snapshotManager.fileIO(), + snapshotManager.tablePath(), + snapshotManager.branch()), + snapshotManager.snapshot(startSnapshotId).schemaId(), + snapshotManager.snapshot(endSnapshotId).schemaId()); + IncrementalStartingScanner incrementalStartingScanner = new IncrementalStartingScanner( snapshotManager, startSnapshotId, endSnapshotId, scanMode); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/TimeTravelUtil.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/TimeTravelUtil.java index 5b4ee4e58c50..d976ed0742e6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/TimeTravelUtil.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/TimeTravelUtil.java @@ -20,8 +20,9 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.Snapshot; +import org.apache.paimon.schema.SchemaManager; +import org.apache.paimon.schema.TableSchema; import org.apache.paimon.table.FileStoreTable; -import org.apache.paimon.utils.Preconditions; import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.utils.SnapshotNotExistException; import org.apache.paimon.utils.TagManager; @@ -30,6 +31,7 @@ import java.util.List; import static org.apache.paimon.CoreOptions.SCAN_SNAPSHOT_ID; +import static org.apache.paimon.utils.Preconditions.checkArgument; /** The util class of resolve snapshot from scan params for time travel. */ public class TimeTravelUtil { @@ -58,7 +60,7 @@ public static Snapshot resolveSnapshotFromOptions( return snapshotManager.latestSnapshot(); } - Preconditions.checkArgument( + checkArgument( scanHandleKey.size() == 1, String.format( "Only one of the following parameters may be set : [%s, %s, %s, %s]", @@ -124,4 +126,22 @@ private static Snapshot resolveSnapshotByTagName( new TagManager(snapshotManager.fileIO(), snapshotManager.tablePath()); return tagManager.getOrThrow(tagName).trimToSnapshot(); } + + public static void checkRescaleBucketForIncrementalQuery( + SchemaManager schemaManager, long schemaId1, long schemaId2) { + if (schemaId1 != schemaId2) { + int bucketNumber1 = bucketNumber(schemaManager, schemaId1); + int bucketNumber2 = bucketNumber(schemaManager, schemaId2); + checkArgument( + bucketNumber1 == bucketNumber2, + "The bucket number of two snapshots are different (%s, %s), which is not supported in incremental query.", + bucketNumber1, + bucketNumber2); + } + } + + private static int bucketNumber(SchemaManager schemaManager, long schemaId) { + TableSchema schema = schemaManager.schema(schemaId); + return CoreOptions.fromMap(schema.options()).bucket(); + } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java index ee24dc8ef352..cb122cd3cf6d 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java @@ -21,6 +21,7 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.flink.util.AbstractTestBase; import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.Table; import org.apache.paimon.utils.BlockingIterator; import org.apache.paimon.utils.DateTimeUtils; import org.apache.paimon.utils.SnapshotNotExistException; @@ -37,6 +38,7 @@ import org.junit.jupiter.params.provider.ValueSource; import java.math.BigDecimal; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -652,6 +654,38 @@ public void testScanBounded() { assertThat(result).containsExactlyInAnyOrder(Row.of(1, 11, 111), Row.of(2, 22, 222)); } + @Test + public void testIncrementQueryWithRescaleBucket() throws Exception { + sql("CREATE TABLE test (a INT PRIMARY KEY NOT ENFORCED, b INT) WITH ('bucket' = '1')"); + Table table = paimonTable("test"); + + sql("INSERT INTO test VALUES (1, 11), (2, 22)"); + long timestamp1 = System.currentTimeMillis(); + sql("ALTER TABLE test SET ('bucket' = '2')"); + sql("INSERT OVERWRITE test SELECT * FROM test"); + sql("INSERT INTO test VALUES (3, 33)"); + long timestamp2 = System.currentTimeMillis(); + + table.createTag("2024-01-01", 1); + table.createTag("2024-01-02", 3); + + List incrementalOptions = + Arrays.asList( + "'incremental-between'='1,3'", + "'incremental-between'='2024-01-01,2024-01-02'", + "'incremental-to-auto-tag'='2024-01-02'", + String.format( + "'incremental-between-timestamp'='%s,%s'", timestamp1, timestamp2)); + + for (String option : incrementalOptions) { + assertThatThrownBy(() -> sql("SELECT * FROM test /*+ OPTIONS (%s) */", option)) + .satisfies( + anyCauseMatches( + IllegalArgumentException.class, + "The bucket number of two snapshots are different (1, 2), which is not supported in incremental query.")); + } + } + private void validateCount1PushDown(String sql) { Transformation transformation = AbstractTestBase.translate(tEnv, sql); while (!transformation.getInputs().isEmpty()) { From fc026c780fe055ccf72d1e36d7287df5ed0099e8 Mon Sep 17 00:00:00 2001 From: yuzelin Date: Wed, 22 Jan 2025 17:59:32 +0800 Subject: [PATCH 2/2] fix --- .../table/source/AbstractDataTableScan.java | 15 --------------- .../snapshot/IncrementalTagStartingScanner.java | 16 ++++++++-------- .../IncrementalTimeStampStartingScanner.java | 10 ---------- .../table/source/snapshot/TimeTravelUtil.java | 4 ++-- .../paimon/flink/BatchFileStoreITCase.java | 11 +++-------- 5 files changed, 13 insertions(+), 43 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java index a176bd56cae5..5bb9ba1378cb 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java @@ -28,7 +28,6 @@ import org.apache.paimon.metrics.MetricRegistry; import org.apache.paimon.operation.FileStoreScan; import org.apache.paimon.options.Options; -import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.table.source.snapshot.CompactedStartingScanner; import org.apache.paimon.table.source.snapshot.ContinuousCompactorStartingScanner; import org.apache.paimon.table.source.snapshot.ContinuousFromSnapshotFullStartingScanner; @@ -47,7 +46,6 @@ import org.apache.paimon.table.source.snapshot.StaticFromTagStartingScanner; import org.apache.paimon.table.source.snapshot.StaticFromTimestampStartingScanner; import org.apache.paimon.table.source.snapshot.StaticFromWatermarkStartingScanner; -import org.apache.paimon.table.source.snapshot.TimeTravelUtil; import org.apache.paimon.tag.Tag; import org.apache.paimon.utils.Filter; import org.apache.paimon.utils.Pair; @@ -251,11 +249,6 @@ private StartingScanner createIncrementalStartingScanner(SnapshotManager snapsho snapshotManager.fileIO(), snapshotManager.tablePath(), snapshotManager.branch()); - SchemaManager schemaManager = - new SchemaManager( - snapshotManager.fileIO(), - snapshotManager.tablePath(), - snapshotManager.branch()); if (conf.contains(CoreOptions.INCREMENTAL_BETWEEN)) { Pair incrementalBetween = options.incrementalBetween(); Optional startTag = tagManager.get(incrementalBetween.getLeft()); @@ -279,9 +272,6 @@ private StartingScanner createIncrementalStartingScanner(SnapshotManager snapsho incrementalBetween.getLeft(), start.id())); } - - TimeTravelUtil.checkRescaleBucketForIncrementalQuery( - schemaManager, start.schemaId(), end.schemaId()); return new IncrementalTagStartingScanner(snapshotManager, start, end); } else { long startId, endId; @@ -295,11 +285,6 @@ private StartingScanner createIncrementalStartingScanner(SnapshotManager snapsho + "Please set two tags or two snapshot Ids.", incrementalBetween.getLeft(), incrementalBetween.getRight())); } - - TimeTravelUtil.checkRescaleBucketForIncrementalQuery( - schemaManager, - snapshotManager.snapshot(startId).schemaId(), - snapshotManager.snapshot(endId).schemaId()); return new IncrementalStartingScanner(snapshotManager, startId, endId, scanMode); } } else if (conf.contains(CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP)) { diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTagStartingScanner.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTagStartingScanner.java index d2f3e672ff2c..835b7595a316 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTagStartingScanner.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTagStartingScanner.java @@ -51,6 +51,14 @@ public IncrementalTagStartingScanner( this.start = start; this.end = end; this.startingSnapshotId = start.id(); + + TimeTravelUtil.checkRescaleBucketForIncrementalTagQuery( + new SchemaManager( + snapshotManager.fileIO(), + snapshotManager.tablePath(), + snapshotManager.branch()), + start.schemaId(), + end.schemaId()); } @Override @@ -96,14 +104,6 @@ public static AbstractStartingScanner create( LOG.info("Found start tag {} .", periodHandler.timeToTag(previousTags.get(0).getRight())); Snapshot start = previousTags.get(0).getLeft().trimToSnapshot(); - TimeTravelUtil.checkRescaleBucketForIncrementalQuery( - new SchemaManager( - snapshotManager.fileIO(), - snapshotManager.tablePath(), - snapshotManager.branch()), - start.schemaId(), - end.schemaId()); - return new IncrementalTagStartingScanner(snapshotManager, start, end); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTimeStampStartingScanner.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTimeStampStartingScanner.java index dc2e1bbd0c13..bccb0b3d3aa5 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTimeStampStartingScanner.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTimeStampStartingScanner.java @@ -19,7 +19,6 @@ package org.apache.paimon.table.source.snapshot; import org.apache.paimon.Snapshot; -import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.table.source.ScanMode; import org.apache.paimon.utils.SnapshotManager; @@ -65,15 +64,6 @@ public Result scan(SnapshotReader reader) { : startingSnapshotId; Snapshot endSnapshot = snapshotManager.earlierOrEqualTimeMills(endTimestamp); Long endSnapshotId = (endSnapshot == null) ? latestSnapshot.id() : endSnapshot.id(); - - TimeTravelUtil.checkRescaleBucketForIncrementalQuery( - new SchemaManager( - snapshotManager.fileIO(), - snapshotManager.tablePath(), - snapshotManager.branch()), - snapshotManager.snapshot(startSnapshotId).schemaId(), - snapshotManager.snapshot(endSnapshotId).schemaId()); - IncrementalStartingScanner incrementalStartingScanner = new IncrementalStartingScanner( snapshotManager, startSnapshotId, endSnapshotId, scanMode); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/TimeTravelUtil.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/TimeTravelUtil.java index d976ed0742e6..4a0f4290df91 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/TimeTravelUtil.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/TimeTravelUtil.java @@ -127,14 +127,14 @@ private static Snapshot resolveSnapshotByTagName( return tagManager.getOrThrow(tagName).trimToSnapshot(); } - public static void checkRescaleBucketForIncrementalQuery( + public static void checkRescaleBucketForIncrementalTagQuery( SchemaManager schemaManager, long schemaId1, long schemaId2) { if (schemaId1 != schemaId2) { int bucketNumber1 = bucketNumber(schemaManager, schemaId1); int bucketNumber2 = bucketNumber(schemaManager, schemaId2); checkArgument( bucketNumber1 == bucketNumber2, - "The bucket number of two snapshots are different (%s, %s), which is not supported in incremental query.", + "The bucket number of two tags are different (%s, %s), which is not supported in incremental tag query.", bucketNumber1, bucketNumber2); } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java index cb122cd3cf6d..d9a5cab1d455 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java @@ -655,34 +655,29 @@ public void testScanBounded() { } @Test - public void testIncrementQueryWithRescaleBucket() throws Exception { + public void testIncrementTagQueryWithRescaleBucket() throws Exception { sql("CREATE TABLE test (a INT PRIMARY KEY NOT ENFORCED, b INT) WITH ('bucket' = '1')"); Table table = paimonTable("test"); sql("INSERT INTO test VALUES (1, 11), (2, 22)"); - long timestamp1 = System.currentTimeMillis(); sql("ALTER TABLE test SET ('bucket' = '2')"); sql("INSERT OVERWRITE test SELECT * FROM test"); sql("INSERT INTO test VALUES (3, 33)"); - long timestamp2 = System.currentTimeMillis(); table.createTag("2024-01-01", 1); table.createTag("2024-01-02", 3); List incrementalOptions = Arrays.asList( - "'incremental-between'='1,3'", "'incremental-between'='2024-01-01,2024-01-02'", - "'incremental-to-auto-tag'='2024-01-02'", - String.format( - "'incremental-between-timestamp'='%s,%s'", timestamp1, timestamp2)); + "'incremental-to-auto-tag'='2024-01-02'"); for (String option : incrementalOptions) { assertThatThrownBy(() -> sql("SELECT * FROM test /*+ OPTIONS (%s) */", option)) .satisfies( anyCauseMatches( IllegalArgumentException.class, - "The bucket number of two snapshots are different (1, 2), which is not supported in incremental query.")); + "The bucket number of two tags are different (1, 2), which is not supported in incremental tag query.")); } }