From ea933283856b970956445c332232d60f3e18f07c Mon Sep 17 00:00:00 2001 From: kunni Date: Wed, 21 Aug 2024 13:48:14 +0800 Subject: [PATCH 1/4] [FLINK-FLINK-36115][pipeline-connector][mysql] add scan.incremental.newly-added-table.enabled option to Allow to scan newly table DDL during incremental reading stage. --- .../mysql/factory/MySqlDataSourceFactory.java | 71 ++++++++++++++----- .../mysql/source/MySqlDataSourceOptions.java | 8 +++ .../MysqlPipelineNewlyAddedTableITCase.java | 46 +++++++++++- 3 files changed, 106 insertions(+), 19 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java index 7f7691961a8..6ee8195eb4c 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java @@ -27,6 +27,7 @@ import org.apache.flink.cdc.common.schema.Selectors; import org.apache.flink.cdc.common.source.DataSource; import org.apache.flink.cdc.connectors.mysql.source.MySqlDataSource; +import org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions; import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig; import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory; import org.apache.flink.cdc.connectors.mysql.source.config.ServerIdRange; @@ -38,6 +39,7 @@ import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.catalog.ObjectPath; +import io.debezium.relational.Tables; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -62,6 +64,7 @@ import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PASSWORD; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PORT; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED; +import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_NEWLY_ADDED_TABLE_ENABLED; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED; @@ -128,6 +131,8 @@ public DataSource createDataSource(Context context) { int connectMaxRetries = config.get(CONNECT_MAX_RETRIES); int connectionPoolSize = config.get(CONNECTION_POOL_SIZE); boolean scanNewlyAddedTableEnabled = config.get(SCAN_NEWLY_ADDED_TABLE_ENABLED); + boolean scanIncrementalNewlyAddedTableEnabled = + config.get(SCAN_INCREMENTAL_NEWLY_ADDED_TABLE_ENABLED); validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, splitSize, 1); validateIntegerOption(CHUNK_META_GROUP_SIZE, splitMetaGroupSize, 1); @@ -166,26 +171,32 @@ public DataSource createDataSource(Context context) { .jdbcProperties(getJdbcProperties(configMap)) .scanNewlyAddedTableEnabled(scanNewlyAddedTableEnabled); - Selectors selectors = new Selectors.SelectorsBuilder().includeTables(tables).build(); - List capturedTables = getTableList(configFactory.createConfig(0), selectors); - if (capturedTables.isEmpty()) { - throw new IllegalArgumentException( - "Cannot find any table by the option 'tables' = " + tables); - } - if (tablesExclude != null) { - Selectors selectExclude = - new Selectors.SelectorsBuilder().includeTables(tablesExclude).build(); - List excludeTables = getTableList(configFactory.createConfig(0), selectExclude); - if (!excludeTables.isEmpty()) { - capturedTables.removeAll(excludeTables); - } + if (scanIncrementalNewlyAddedTableEnabled) { + String newTables = validateTableAndReturnDebeziumStyle(tables); + configFactory.tableList(newTables); + } else { + Selectors selectors = new Selectors.SelectorsBuilder().includeTables(tables).build(); + List capturedTables = getTableList(configFactory.createConfig(0), selectors); if (capturedTables.isEmpty()) { throw new IllegalArgumentException( - "Cannot find any table with by the option 'tables.exclude' = " - + tablesExclude); + "Cannot find any table by the option 'tables' = " + tables); } + if (tablesExclude != null) { + Selectors selectExclude = + new Selectors.SelectorsBuilder().includeTables(tablesExclude).build(); + List excludeTables = + getTableList(configFactory.createConfig(0), selectExclude); + if (!excludeTables.isEmpty()) { + capturedTables.removeAll(excludeTables); + } + if (capturedTables.isEmpty()) { + throw new IllegalArgumentException( + "Cannot find any table with by the option 'tables.exclude' = " + + tablesExclude); + } + } + configFactory.tableList(capturedTables.toArray(new String[0])); } - configFactory.tableList(capturedTables.toArray(new String[0])); String chunkKeyColumns = config.get(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN); if (chunkKeyColumns != null) { @@ -256,6 +267,7 @@ public Set> optionalOptions() { options.add(CHUNK_META_GROUP_SIZE); options.add(CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND); options.add(CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND); + options.add(SCAN_INCREMENTAL_NEWLY_ADDED_TABLE_ENABLED); return options; } @@ -410,6 +422,33 @@ && doubleCompare(distributionFactorLower, 1.0d) <= 0, distributionFactorLower)); } + /** + * Currently, The supported regular syntax is not exactly the same in {@link Selectors} and + * {@link Tables.TableFilter}. + * + *

The main distinction are : + * + *

1) {@link Selectors} use `,` to split table names and {@link Tables.TableFilter} use use + * `|` to split table names. + * + *

2) If there is a need to use a dot (.) in a regular expression to match any character, it + * is necessary to escape the dot with a backslash, refer to {@link + * MySqlDataSourceOptions#TABLES}. + */ + private String validateTableAndReturnDebeziumStyle(String tables) { + // MySQL table names are not allowed to have `,` character. + if (tables.contains(",")) { + throw new IllegalArgumentException( + "the `,` in " + + tables + + " is not supported when " + + SCAN_INCREMENTAL_NEWLY_ADDED_TABLE_ENABLED + + " was enabled."); + } + + return tables.replace("\\.", "."); + } + /** Replaces the default timezone placeholder with session timezone, if applicable. */ private static ZoneId getServerTimeZone(Configuration config) { final String serverTimeZone = config.get(SERVER_TIME_ZONE); diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java index 9a18350b348..a1dbdec9ec1 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java @@ -261,4 +261,12 @@ public class MySqlDataSourceOptions { + "If there is a need to use a dot (.) in a regular expression to match any character, " + "it is necessary to escape the dot with a backslash." + "eg. db0.\\.*, db1.user_table_[0-9]+, db[1-2].[app|web]_order_\\.*"); + + @Experimental + public static final ConfigOption SCAN_INCREMENTAL_NEWLY_ADDED_TABLE_ENABLED = + ConfigOptions.key("scan.incremental.newly-added-table.enabled") + .booleanType() + .defaultValue(false) + .withDescription( + "Whether to scan the ddl and dml statements of newly added tables or not in incremental reading stage, by default is false."); } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MysqlPipelineNewlyAddedTableITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MysqlPipelineNewlyAddedTableITCase.java index 7187e644735..1aeb368a818 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MysqlPipelineNewlyAddedTableITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MysqlPipelineNewlyAddedTableITCase.java @@ -21,6 +21,7 @@ import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.cdc.common.data.binary.BinaryStringData; +import org.apache.flink.cdc.common.event.ChangeEvent; import org.apache.flink.cdc.common.event.CreateTableEvent; import org.apache.flink.cdc.common.event.DataChangeEvent; import org.apache.flink.cdc.common.event.Event; @@ -83,7 +84,10 @@ import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.HOSTNAME; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PASSWORD; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PORT; +import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_NEWLY_ADDED_TABLE_ENABLED; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED; +import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_STARTUP_MODE; +import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_STARTUP_TIMESTAMP_MILLIS; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SERVER_ID; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SERVER_TIME_ZONE; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TABLES; @@ -149,6 +153,40 @@ private MySqlConnection getConnection() { return DebeziumUtils.createMySqlConnection(configuration, new Properties()); } + @Test + public void testAddNewlyTablesInIncrementalStage() throws Exception { + List tables = Collections.singletonList("address_\\.*"); + Map options = new HashMap<>(); + options.put(SCAN_INCREMENTAL_NEWLY_ADDED_TABLE_ENABLED.key(), "true"); + options.put(SCAN_STARTUP_MODE.key(), "timestamp"); + options.put( + SCAN_STARTUP_TIMESTAMP_MILLIS.key(), String.valueOf(System.currentTimeMillis())); + + FlinkSourceProvider sourceProvider = getFlinkSourceProvider(tables, 4, options); + StreamExecutionEnvironment env = + StreamExecutionEnvironment.getExecutionEnvironment(new Configuration()); + env.enableCheckpointing(200); + DataStreamSource source = + env.fromSource( + sourceProvider.getSource(), + WatermarkStrategy.noWatermarks(), + MySqlDataSourceFactory.IDENTIFIER, + new EventTypeInfo()); + + TypeSerializer serializer = + source.getTransformation().getOutputType().createSerializer(env.getConfig()); + CheckpointedCollectResultBuffer resultBuffer = + new CheckpointedCollectResultBuffer<>(serializer); + String accumulatorName = "dataStreamCollect_" + UUID.randomUUID(); + CollectResultIterator iterator = + addCollector(env, source, resultBuffer, serializer, accumulatorName); + env.executeAsync("AddNewlyTablesInIncrementalStage"); + initialAddressTables(getConnection(), Collections.singletonList("address_beijing")); + List actual = fetchResults(iterator, 4); + assertThat(((ChangeEvent) actual.get(0)).tableId()) + .isEqualTo(TableId.tableId(customDatabase.getDatabaseName(), "address_beijing")); + } + @Test public void testAddNewTableOneByOneSingleParallelism() throws Exception { TestParam testParam = @@ -228,7 +266,7 @@ private void testAddNewTable(TestParam testParam, int parallelism) throws Except List listenTablesFirstRound = testParam.getFirstRoundListenTables(); FlinkSourceProvider sourceProvider = - getFlinkSourceProvider(listenTablesFirstRound, parallelism); + getFlinkSourceProvider(listenTablesFirstRound, parallelism, new HashMap<>()); DataStreamSource source = env.fromSource( sourceProvider.getSource(), @@ -272,7 +310,7 @@ private void testAddNewTable(TestParam testParam, int parallelism) throws Except getStreamExecutionEnvironment(finishedSavePointPath, parallelism); List listenTablesSecondRound = testParam.getSecondRoundListenTables(); FlinkSourceProvider restoredSourceProvider = - getFlinkSourceProvider(listenTablesSecondRound, parallelism); + getFlinkSourceProvider(listenTablesSecondRound, parallelism, new HashMap<>()); DataStreamSource restoreSource = restoredEnv.fromSource( restoredSourceProvider.getSource(), @@ -432,7 +470,8 @@ private void initialAddressTables(JdbcConnection connection, List addres } } - private FlinkSourceProvider getFlinkSourceProvider(List tables, int parallelism) { + private FlinkSourceProvider getFlinkSourceProvider( + List tables, int parallelism, Map additionalOptions) { List fullTableNames = tables.stream() .map(table -> customDatabase.getDatabaseName() + "." + table) @@ -446,6 +485,7 @@ private FlinkSourceProvider getFlinkSourceProvider(List tables, int para options.put(TABLES.key(), StringUtils.join(fullTableNames, ",")); options.put(SERVER_ID.key(), getServerId(parallelism)); options.put(SCAN_NEWLY_ADDED_TABLE_ENABLED.key(), "true"); + options.putAll(additionalOptions); Factory.Context context = new FactoryHelper.DefaultContext( org.apache.flink.cdc.common.configuration.Configuration.fromMap(options), From 5de2c45f290789753762c7a1b92e6255122e0de6 Mon Sep 17 00:00:00 2001 From: kunni Date: Wed, 21 Aug 2024 20:22:12 +0800 Subject: [PATCH 2/4] [FLINK-FLINK-36115][pipeline-connector][mysql] add scan.binlog.newly-added-table.enabled option to Allow to scan newly table during binlog reading stage. --- .../docs/connectors/pipeline-connectors/mysql.md | 11 +++++++++++ .../docs/connectors/pipeline-connectors/mysql.md | 11 +++++++++++ .../mysql/factory/MySqlDataSourceFactory.java | 8 ++++---- .../mysql/source/MySqlDataSourceOptions.java | 9 ++++++--- .../source/MysqlPipelineNewlyAddedTableITCase.java | 8 ++++---- 5 files changed, 36 insertions(+), 11 deletions(-) diff --git a/docs/content.zh/docs/connectors/pipeline-connectors/mysql.md b/docs/content.zh/docs/connectors/pipeline-connectors/mysql.md index 636ed975fba..4d5edb66fd3 100644 --- a/docs/content.zh/docs/connectors/pipeline-connectors/mysql.md +++ b/docs/content.zh/docs/connectors/pipeline-connectors/mysql.md @@ -275,6 +275,17 @@ pipeline: Boolean 是否启用动态加表特性,默认关闭。 此配置项只有作业从savepoint/checkpoint启动时才生效。 + + scan.binlog.newly-added-table.enabled + optional + false + Boolean + 在 binlog 读取阶段,是否读取新增表的表结构变更和数据变更,默认值是 false。
+ scan.newly-added-table.enabled 和 scan.binlog.newly-added-table.enabled 参数的不同在于:
+ scan.newly-added-table.enabled: 在作业重启后,对新增表的全量和增量数据进行读取;
+ scan.binlog.newly-added-table.enabled: 只在 binlog 读取阶段读取新增表的增量数据。 + + diff --git a/docs/content/docs/connectors/pipeline-connectors/mysql.md b/docs/content/docs/connectors/pipeline-connectors/mysql.md index 879701614e0..819da0d0900 100644 --- a/docs/content/docs/connectors/pipeline-connectors/mysql.md +++ b/docs/content/docs/connectors/pipeline-connectors/mysql.md @@ -282,6 +282,17 @@ pipeline: Boolean Whether to enable scan the newly added tables feature or not, by default is false. This option is only useful when we start the job from a savepoint/checkpoint. + + scan.binlog.newly-added-table.enabled + optional + false + Boolean + In binlog reading stage, whether to scan the ddl and dml statements of newly added tables or not, by default is false.
+ The difference between scan.newly-added-table.enabled and scan.binlog.newly-added-table.enabled options is:
+ scan.newly-added-table.enabled: do re-snapshot & binlog-reading for newly added table when restore;
+ scan.binlog.newly-added-table.enabled: only do binlog-reading for newly added table during binlog reading phase. + + diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java index 6ee8195eb4c..1204b83db40 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java @@ -63,8 +63,8 @@ import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.HOSTNAME; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PASSWORD; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PORT; +import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED; -import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_NEWLY_ADDED_TABLE_ENABLED; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED; @@ -132,7 +132,7 @@ public DataSource createDataSource(Context context) { int connectionPoolSize = config.get(CONNECTION_POOL_SIZE); boolean scanNewlyAddedTableEnabled = config.get(SCAN_NEWLY_ADDED_TABLE_ENABLED); boolean scanIncrementalNewlyAddedTableEnabled = - config.get(SCAN_INCREMENTAL_NEWLY_ADDED_TABLE_ENABLED); + config.get(SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED); validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, splitSize, 1); validateIntegerOption(CHUNK_META_GROUP_SIZE, splitMetaGroupSize, 1); @@ -267,7 +267,7 @@ public Set> optionalOptions() { options.add(CHUNK_META_GROUP_SIZE); options.add(CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND); options.add(CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND); - options.add(SCAN_INCREMENTAL_NEWLY_ADDED_TABLE_ENABLED); + options.add(SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED); return options; } @@ -442,7 +442,7 @@ private String validateTableAndReturnDebeziumStyle(String tables) { "the `,` in " + tables + " is not supported when " - + SCAN_INCREMENTAL_NEWLY_ADDED_TABLE_ENABLED + + SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED + " was enabled."); } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java index a1dbdec9ec1..bc9d8b7bfd7 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java @@ -263,10 +263,13 @@ public class MySqlDataSourceOptions { + "eg. db0.\\.*, db1.user_table_[0-9]+, db[1-2].[app|web]_order_\\.*"); @Experimental - public static final ConfigOption SCAN_INCREMENTAL_NEWLY_ADDED_TABLE_ENABLED = - ConfigOptions.key("scan.incremental.newly-added-table.enabled") + public static final ConfigOption SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED = + ConfigOptions.key("scan.binlog.newly-added-table.enabled") .booleanType() .defaultValue(false) .withDescription( - "Whether to scan the ddl and dml statements of newly added tables or not in incremental reading stage, by default is false."); + "In binlog reading stage, whether to scan the ddl and dml statements of newly added tables or not, by default is false. \n" + + "The difference between scan.newly-added-table.enabled and scan.binlog.newly-added-table.enabled options is: \n" + + "scan.newly-added-table.enabled: do re-snapshot & binlog-reading for newly added table when restore; \n" + + "scan.binlog.newly-added-table.enabled: only do binlog-reading for newly added table during binlog reading phase."); } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MysqlPipelineNewlyAddedTableITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MysqlPipelineNewlyAddedTableITCase.java index 1aeb368a818..4cc1e952ab7 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MysqlPipelineNewlyAddedTableITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MysqlPipelineNewlyAddedTableITCase.java @@ -84,7 +84,7 @@ import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.HOSTNAME; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PASSWORD; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PORT; -import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_NEWLY_ADDED_TABLE_ENABLED; +import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_STARTUP_MODE; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_STARTUP_TIMESTAMP_MILLIS; @@ -154,10 +154,10 @@ private MySqlConnection getConnection() { } @Test - public void testAddNewlyTablesInIncrementalStage() throws Exception { + public void testScanBinlogNewlyAddedTableEnabled() throws Exception { List tables = Collections.singletonList("address_\\.*"); Map options = new HashMap<>(); - options.put(SCAN_INCREMENTAL_NEWLY_ADDED_TABLE_ENABLED.key(), "true"); + options.put(SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED.key(), "true"); options.put(SCAN_STARTUP_MODE.key(), "timestamp"); options.put( SCAN_STARTUP_TIMESTAMP_MILLIS.key(), String.valueOf(System.currentTimeMillis())); @@ -180,7 +180,7 @@ public void testAddNewlyTablesInIncrementalStage() throws Exception { String accumulatorName = "dataStreamCollect_" + UUID.randomUUID(); CollectResultIterator iterator = addCollector(env, source, resultBuffer, serializer, accumulatorName); - env.executeAsync("AddNewlyTablesInIncrementalStage"); + env.executeAsync("AddNewlyTablesWhenReadingBinlog"); initialAddressTables(getConnection(), Collections.singletonList("address_beijing")); List actual = fetchResults(iterator, 4); assertThat(((ChangeEvent) actual.get(0)).tableId()) From 7173b46a203debeb0541777852eadf00d6228fe6 Mon Sep 17 00:00:00 2001 From: kunni Date: Wed, 21 Aug 2024 20:23:46 +0800 Subject: [PATCH 3/4] [FLINK-FLINK-36115][pipeline-connector][mysql] add scan.incremental.newly-added-table.enabled option to Allow to scan newly table DDL during incremental reading stage. --- docs/content/docs/connectors/pipeline-connectors/mysql.md | 2 +- .../cdc/connectors/mysql/source/MySqlDataSourceOptions.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/content/docs/connectors/pipeline-connectors/mysql.md b/docs/content/docs/connectors/pipeline-connectors/mysql.md index 819da0d0900..36c4cc7708b 100644 --- a/docs/content/docs/connectors/pipeline-connectors/mysql.md +++ b/docs/content/docs/connectors/pipeline-connectors/mysql.md @@ -289,7 +289,7 @@ pipeline: Boolean In binlog reading stage, whether to scan the ddl and dml statements of newly added tables or not, by default is false.
The difference between scan.newly-added-table.enabled and scan.binlog.newly-added-table.enabled options is:
- scan.newly-added-table.enabled: do re-snapshot & binlog-reading for newly added table when restore;
+ scan.newly-added-table.enabled: do re-snapshot & binlog-reading for newly added table when restored;
scan.binlog.newly-added-table.enabled: only do binlog-reading for newly added table during binlog reading phase. diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java index bc9d8b7bfd7..580d370b5aa 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java @@ -270,6 +270,6 @@ public class MySqlDataSourceOptions { .withDescription( "In binlog reading stage, whether to scan the ddl and dml statements of newly added tables or not, by default is false. \n" + "The difference between scan.newly-added-table.enabled and scan.binlog.newly-added-table.enabled options is: \n" - + "scan.newly-added-table.enabled: do re-snapshot & binlog-reading for newly added table when restore; \n" + + "scan.newly-added-table.enabled: do re-snapshot & binlog-reading for newly added table when restored; \n" + "scan.binlog.newly-added-table.enabled: only do binlog-reading for newly added table during binlog reading phase."); } From a0cbf7cfbe818d85940aa6977e7a636aa466300d Mon Sep 17 00:00:00 2001 From: kunni Date: Wed, 21 Aug 2024 22:24:34 +0800 Subject: [PATCH 4/4] [FLINK-FLINK-36115][pipeline-connector][mysql] add scan.binlog.newly-added-table.enabled option to Allow to scan newly table during binlog reading stage. --- .../cdc/connectors/mysql/factory/MySqlDataSourceFactory.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java index 1204b83db40..5f538eeb1c5 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java @@ -131,7 +131,7 @@ public DataSource createDataSource(Context context) { int connectMaxRetries = config.get(CONNECT_MAX_RETRIES); int connectionPoolSize = config.get(CONNECTION_POOL_SIZE); boolean scanNewlyAddedTableEnabled = config.get(SCAN_NEWLY_ADDED_TABLE_ENABLED); - boolean scanIncrementalNewlyAddedTableEnabled = + boolean scanBinlogNewlyAddedTableEnabled = config.get(SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED); validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, splitSize, 1); @@ -171,7 +171,7 @@ public DataSource createDataSource(Context context) { .jdbcProperties(getJdbcProperties(configMap)) .scanNewlyAddedTableEnabled(scanNewlyAddedTableEnabled); - if (scanIncrementalNewlyAddedTableEnabled) { + if (scanBinlogNewlyAddedTableEnabled) { String newTables = validateTableAndReturnDebeziumStyle(tables); configFactory.tableList(newTables); } else {