diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/splitter/JdbcSourceChunkSplitter.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/splitter/JdbcSourceChunkSplitter.java index 508e8cb9be7..9e5594136b8 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/splitter/JdbcSourceChunkSplitter.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/splitter/JdbcSourceChunkSplitter.java @@ -417,17 +417,7 @@ private SnapshotSplit createSnapshotSplit( Map schema = new HashMap<>(); schema.put(tableId, dialect.queryTableSchema(jdbc, tableId)); return new SnapshotSplit( - tableId, - splitId(tableId, chunkId), - splitKeyType, - splitStart, - splitEnd, - null, - schema); - } - - private String splitId(TableId tableId, int chunkId) { - return tableId.toString() + ":" + chunkId; + tableId, chunkId, splitKeyType, splitStart, splitEnd, null, schema); } private void maySleep(int count, TableId tableId) { diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/meta/split/SnapshotSplit.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/meta/split/SnapshotSplit.java index 0ca125a7adf..714213985a4 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/meta/split/SnapshotSplit.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/meta/split/SnapshotSplit.java @@ -17,6 +17,7 @@ package org.apache.flink.cdc.connectors.base.source.meta.split; +import org.apache.flink.cdc.common.annotation.Internal; import org.apache.flink.cdc.connectors.base.source.meta.offset.Offset; import org.apache.flink.table.types.logical.RowType; @@ -44,6 +45,35 @@ public class SnapshotSplit extends SourceSplitBase { @Nullable transient byte[] serializedFormCache; + /** + * Create a SnapshotSplit with generating splitId with the given tableId and chunkId. + * + * @see #generateSplitId(TableId, int) + */ + public SnapshotSplit( + TableId tableId, + int chunkId, + RowType splitKeyType, + Object[] splitStart, + Object[] splitEnd, + Offset highWatermark, + Map tableSchemas) { + super(generateSplitId(tableId, chunkId)); + this.tableId = tableId; + this.splitKeyType = splitKeyType; + this.splitStart = splitStart; + this.splitEnd = splitEnd; + this.highWatermark = highWatermark; + this.tableSchemas = tableSchemas; + } + + /** + * This constructor should not be used directly. Please use the other constructor. If this + * constructor must be invoked, please use the same format for the splitId as {@link + * #generateSplitId(TableId, int)}. Or else the parsing method will fail. See more in {@link + * #extractTableId(String)} and {@link #extractChunkId(String)}. + */ + @Internal public SnapshotSplit( TableId tableId, String splitId, @@ -95,6 +125,18 @@ public final SchemalessSnapshotSplit toSchemalessSnapshotSplit() { tableId, splitId, splitKeyType, splitStart, splitEnd, highWatermark); } + public static String generateSplitId(TableId tableId, int chunkId) { + return tableId.toString() + ":" + chunkId; + } + + public static TableId extractTableId(String splitId) { + return TableId.parse(splitId.substring(0, splitId.lastIndexOf(":"))); + } + + public static int extractChunkId(String splitId) { + return Integer.parseInt(splitId.substring(splitId.lastIndexOf(":") + 1)); + } + @Override public boolean equals(Object o) { if (this == o) { diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/test/java/org/apache/flink/cdc/connectors/base/source/assigner/state/PendingSplitsStateSerializerTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/test/java/org/apache/flink/cdc/connectors/base/source/assigner/state/PendingSplitsStateSerializerTest.java index 1557818f31d..33852f99797 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/test/java/org/apache/flink/cdc/connectors/base/source/assigner/state/PendingSplitsStateSerializerTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/test/java/org/apache/flink/cdc/connectors/base/source/assigner/state/PendingSplitsStateSerializerTest.java @@ -42,6 +42,8 @@ import java.util.HashMap; import java.util.Map; +import static org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit.generateSplitId; + /** Tests for {@link PendingSplitsStateSerializer}. */ public class PendingSplitsStateSerializerTest { @@ -152,7 +154,7 @@ public Offset createNoStoppingOffset() { private SchemalessSnapshotSplit constuctSchemalessSnapshotSplit() { return new SchemalessSnapshotSplit( tableId, - "test", + generateSplitId(tableId, 0), new RowType( Collections.singletonList(new RowType.RowField("id", new BigIntType()))), null, diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/test/java/org/apache/flink/cdc/connectors/base/source/meta/split/SourceSplitSerializerTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/test/java/org/apache/flink/cdc/connectors/base/source/meta/split/SourceSplitSerializerTest.java index af1c0eb33ac..f81e1f93b80 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/test/java/org/apache/flink/cdc/connectors/base/source/meta/split/SourceSplitSerializerTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/test/java/org/apache/flink/cdc/connectors/base/source/meta/split/SourceSplitSerializerTest.java @@ -88,7 +88,7 @@ public void testStreamSplitBackwardCompatibility() throws IOException { private SnapshotSplit constuctSnapshotSplit() { return new SnapshotSplit( new TableId("cata`log\"", "s\"che`ma", "ta\"ble.1`"), - "test", + 0, new RowType( Collections.singletonList(new RowType.RowField("id", new BigIntType()))), null, diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/assigners/splitters/SampleBucketSplitStrategy.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/assigners/splitters/SampleBucketSplitStrategy.java index ad3c0021358..2102560b61c 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/assigners/splitters/SampleBucketSplitStrategy.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/assigners/splitters/SampleBucketSplitStrategy.java @@ -129,7 +129,7 @@ public Collection split(SplitContext splitContext) { SnapshotSplit firstSplit = new SnapshotSplit( collectionId, - splitId(collectionId, 0), + 0, rowType, ChunkUtils.minLowerBoundOfId(), ChunkUtils.boundOfId(lowerBoundOfBucket(chunks.get(0))), @@ -142,7 +142,7 @@ public Collection split(SplitContext splitContext) { snapshotSplits.add( new SnapshotSplit( collectionId, - splitId(collectionId, i + 1), + i + 1, rowType, ChunkUtils.boundOfId(lowerBoundOfBucket(bucket)), ChunkUtils.boundOfId(upperBoundOfBucket(bucket)), @@ -153,7 +153,7 @@ public Collection split(SplitContext splitContext) { SnapshotSplit lastSplit = new SnapshotSplit( collectionId, - splitId(collectionId, chunks.size() + 1), + chunks.size() + 1, rowType, ChunkUtils.boundOfId(upperBoundOfBucket(chunks.get(chunks.size() - 1))), ChunkUtils.maxUpperBoundOfId(), diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/assigners/splitters/ShardedSplitStrategy.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/assigners/splitters/ShardedSplitStrategy.java index bc451d85bd6..7902bb02220 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/assigners/splitters/ShardedSplitStrategy.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/assigners/splitters/ShardedSplitStrategy.java @@ -116,7 +116,7 @@ && isNotShardedByHash(collectionMetadata))) { snapshotSplits.add( new SnapshotSplit( collectionId, - splitId(collectionId, i), + i, rowType, new Object[] {splitKeys, chunk.getDocument(MIN_FIELD)}, new Object[] {splitKeys, chunk.getDocument(MAX_FIELD)}, diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/assigners/splitters/SingleSplitStrategy.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/assigners/splitters/SingleSplitStrategy.java index ea57fb1fb57..9c6faf2b7cb 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/assigners/splitters/SingleSplitStrategy.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/assigners/splitters/SingleSplitStrategy.java @@ -54,7 +54,7 @@ public Collection split(SplitContext splitContext) { SnapshotSplit snapshotSplit = new SnapshotSplit( collectionId, - splitId(collectionId, 0), + 0, shardKeysToRowType(singleton(ID_FIELD)), ChunkUtils.minLowerBoundOfId(), ChunkUtils.maxUpperBoundOfId(), diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/assigners/splitters/SplitStrategy.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/assigners/splitters/SplitStrategy.java index dc826217837..fce519189fe 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/assigners/splitters/SplitStrategy.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/assigners/splitters/SplitStrategy.java @@ -22,7 +22,6 @@ import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.types.logical.RowType; -import io.debezium.relational.TableId; import org.bson.BsonDocument; import java.util.Collection; @@ -36,10 +35,6 @@ public interface SplitStrategy { Collection split(SplitContext splitContext); - default String splitId(TableId collectionId, int chunkId) { - return collectionId.identifier() + ":" + chunkId; - } - default RowType shardKeysToRowType(BsonDocument shardKeys) { return shardKeysToRowType(shardKeys.keySet()); } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/assigners/splitters/SplitVectorSplitStrategy.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/assigners/splitters/SplitVectorSplitStrategy.java index 2aae73e2599..a5abc23a82e 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/assigners/splitters/SplitVectorSplitStrategy.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/assigners/splitters/SplitVectorSplitStrategy.java @@ -113,7 +113,7 @@ public Collection split(SplitContext splitContext) { snapshotSplits.add( new SnapshotSplit( collectionId, - splitId(collectionId, i), + i, rowType, ChunkUtils.boundOfId(lowerValue), ChunkUtils.boundOfId(splitKeyValue), @@ -125,7 +125,7 @@ public Collection split(SplitContext splitContext) { SnapshotSplit lastSplit = new SnapshotSplit( collectionId, - splitId(collectionId, splitKeys.size()), + splitKeys.size(), rowType, ChunkUtils.boundOfId(lowerValue), ChunkUtils.maxUpperBoundOfId(), diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitter.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitter.java index ec0bab77fe7..d22cc555f9f 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitter.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitter.java @@ -369,13 +369,7 @@ private MySqlSnapshotSplit createSnapshotSplit( Map schema = new HashMap<>(); schema.put(tableId, mySqlSchema.getTableSchema(partition, jdbc, tableId)); return new MySqlSnapshotSplit( - tableId, - splitId(tableId, chunkId), - splitKeyType, - splitStart, - splitEnd, - null, - schema); + tableId, chunkId, splitKeyType, splitStart, splitEnd, null, schema); } // ------------------------------------------------------------------------------------------ @@ -455,10 +449,6 @@ private double calculateDistributionFactor( return distributionFactor; } - private static String splitId(TableId tableId, int chunkId) { - return tableId.toString() + ":" + chunkId; - } - private static void maySleep(int count, TableId tableId) { // every 10 queries to sleep 0.1s if (count % 10 == 0) { diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/split/MySqlSnapshotSplit.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/split/MySqlSnapshotSplit.java index 68b32c04a4e..c32a306e3ce 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/split/MySqlSnapshotSplit.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/split/MySqlSnapshotSplit.java @@ -17,6 +17,7 @@ package org.apache.flink.cdc.connectors.mysql.source.split; +import org.apache.flink.cdc.common.annotation.Internal; import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset; import org.apache.flink.table.types.logical.RowType; @@ -44,6 +45,35 @@ public class MySqlSnapshotSplit extends MySqlSplit { @Nullable transient byte[] serializedFormCache; + /** + * Create a SnapshotSplit with generating splitId with the given tableId and chunkId. + * + * @see #generateSplitId(TableId, int) + */ + public MySqlSnapshotSplit( + TableId tableId, + int chunkId, + RowType splitKeyType, + Object[] splitStart, + Object[] splitEnd, + BinlogOffset highWatermark, + Map tableSchemas) { + super(generateSplitId(tableId, chunkId)); + this.tableId = tableId; + this.splitKeyType = splitKeyType; + this.splitStart = splitStart; + this.splitEnd = splitEnd; + this.highWatermark = highWatermark; + this.tableSchemas = tableSchemas; + } + + /** + * This constructor should not be used directly. Please use the other constructor. If this + * constructor must be invoked, please use the same format for the splitId as {@link + * #generateSplitId(TableId, int)}. Or else the parsing method will fail. See more in {@link + * #extractTableId(String)} and {@link #extractChunkId(String)}. + */ + @Internal public MySqlSnapshotSplit( TableId tableId, String splitId, @@ -95,6 +125,18 @@ public final MySqlSchemalessSnapshotSplit toSchemalessSnapshotSplit() { tableId, splitId, splitKeyType, splitStart, splitEnd, highWatermark); } + public static String generateSplitId(TableId tableId, int chunkId) { + return tableId.toString() + ":" + chunkId; + } + + public static TableId extractTableId(String splitId) { + return TableId.parse(splitId.substring(0, splitId.lastIndexOf(":"))); + } + + public static int extractChunkId(String splitId) { + return Integer.parseInt(splitId.substring(splitId.lastIndexOf(":") + 1)); + } + @Override public boolean equals(Object o) { if (this == o) { diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/state/PendingSplitsStateSerializerTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/state/PendingSplitsStateSerializerTest.java index 87527eb04ff..db0757f4753 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/state/PendingSplitsStateSerializerTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/state/PendingSplitsStateSerializerTest.java @@ -41,6 +41,7 @@ import java.util.List; import java.util.Map; +import static org.apache.flink.cdc.connectors.mysql.source.split.MySqlSnapshotSplit.generateSplitId; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertSame; @@ -190,7 +191,7 @@ private static MySqlSchemalessSnapshotSplit getTestSchemalessSnapshotSplit( TableId tableId, int splitNo) { return new MySqlSchemalessSnapshotSplit( tableId, - tableId.toString() + "-" + splitNo, + generateSplitId(tableId, splitNo), new RowType( Collections.singletonList(new RowType.RowField("id", new BigIntType()))), new Object[] {100L + splitNo * 1000L}, diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReaderTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReaderTest.java index db790f18797..2f674b612d9 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReaderTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReaderTest.java @@ -149,23 +149,11 @@ public void testRemoveTableUsingStateFromSnapshotPhase() throws Exception { snapshotSplits = Collections.singletonList( new MySqlSnapshotSplit( - tableId0, - tableId0 + ":0", - splitType, - null, - null, - null, - tableSchemas)); + tableId0, 0, splitType, null, null, null, tableSchemas)); toRemoveSplits = Collections.singletonList( new MySqlSnapshotSplit( - tableId1, - tableId1 + ":0", - splitType, - null, - null, - null, - tableSchemas)); + tableId1, 0, splitType, null, null, null, tableSchemas)); } // Step 1: start source reader and assign snapshot splits @@ -254,7 +242,7 @@ public void testFinishedUnackedSplitsUsingStateFromSnapshotPhase() throws Except Arrays.asList( new MySqlSnapshotSplit( tableId, - tableId + ":0", + 0, splitType, null, new Integer[] {200}, @@ -262,7 +250,7 @@ public void testFinishedUnackedSplitsUsingStateFromSnapshotPhase() throws Except tableSchemas), new MySqlSnapshotSplit( tableId, - tableId + ":1", + 1, splitType, new Integer[] {200}, new Integer[] {1500}, @@ -270,7 +258,7 @@ public void testFinishedUnackedSplitsUsingStateFromSnapshotPhase() throws Except tableSchemas), new MySqlSnapshotSplit( tableId, - tableId + ":2", + 2, splitType, new Integer[] {1500}, null, diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/split/MySqlSplitSerializerTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/split/MySqlSplitSerializerTest.java index 2e528094c55..78acd0712ff 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/split/MySqlSplitSerializerTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/split/MySqlSplitSerializerTest.java @@ -46,7 +46,7 @@ public void testSnapshotSplit() throws Exception { final MySqlSplit split = new MySqlSnapshotSplit( TableId.parse("test_db.test_table"), - "test_db.test_table-1", + 1, new RowType( Collections.singletonList( new RowType.RowField("id", new BigIntType()))), @@ -122,7 +122,7 @@ public void testRepeatedSerializationCache() throws Exception { final MySqlSplit split = new MySqlSnapshotSplit( TableId.parse("test_db.test_table"), - "test_db.test_table-0", + 0, new RowType( Collections.singletonList( new RowType.RowField("id", new BigIntType()))), diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/split/MySqlSplitStateTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/split/MySqlSplitStateTest.java index c0d222eaabb..f51707387f1 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/split/MySqlSplitStateTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/split/MySqlSplitStateTest.java @@ -41,7 +41,7 @@ public void testFromToSplit() { final MySqlSnapshotSplit split = new MySqlSnapshotSplit( TableId.parse("test_db.test_table"), - "test_db.test_table-1", + 1, new RowType( Collections.singletonList( new RowType.RowField("id", new BigIntType()))), @@ -58,7 +58,7 @@ public void testRecordSnapshotSplitState() { final MySqlSnapshotSplit split = new MySqlSnapshotSplit( TableId.parse("test_db.test_table"), - "test_db.test_table-1", + 1, new RowType( Collections.singletonList( new RowType.RowField("id", new BigIntType()))), @@ -73,7 +73,7 @@ public void testRecordSnapshotSplitState() { final MySqlSnapshotSplit expected = new MySqlSnapshotSplit( TableId.parse("test_db.test_table"), - "test_db.test_table-1", + 1, new RowType( Collections.singletonList( new RowType.RowField("id", new BigIntType()))),