diff --git a/docs/content.zh/docs/connectors/pipeline-connectors/mysql.md b/docs/content.zh/docs/connectors/pipeline-connectors/mysql.md index 636ed975fba..4d5edb66fd3 100644 --- a/docs/content.zh/docs/connectors/pipeline-connectors/mysql.md +++ b/docs/content.zh/docs/connectors/pipeline-connectors/mysql.md @@ -275,6 +275,17 @@ pipeline: Boolean 是否启用动态加表特性,默认关闭。 此配置项只有作业从savepoint/checkpoint启动时才生效。 + + scan.binlog.newly-added-table.enabled + optional + false + Boolean + 在 binlog 读取阶段,是否读取新增表的表结构变更和数据变更,默认值是 false。
+ scan.newly-added-table.enabled 和 scan.binlog.newly-added-table.enabled 参数的不同在于:
+ scan.newly-added-table.enabled: 在作业重启后,对新增表的全量和增量数据进行读取;
+ scan.binlog.newly-added-table.enabled: 只在 binlog 读取阶段读取新增表的增量数据。 + + diff --git a/docs/content/docs/connectors/pipeline-connectors/mysql.md b/docs/content/docs/connectors/pipeline-connectors/mysql.md index 879701614e0..36c4cc7708b 100644 --- a/docs/content/docs/connectors/pipeline-connectors/mysql.md +++ b/docs/content/docs/connectors/pipeline-connectors/mysql.md @@ -282,6 +282,17 @@ pipeline: Boolean Whether to enable scan the newly added tables feature or not, by default is false. This option is only useful when we start the job from a savepoint/checkpoint. + + scan.binlog.newly-added-table.enabled + optional + false + Boolean + In binlog reading stage, whether to scan the ddl and dml statements of newly added tables or not, by default is false.
+ The difference between scan.newly-added-table.enabled and scan.binlog.newly-added-table.enabled options is:
+ scan.newly-added-table.enabled: do re-snapshot & binlog-reading for newly added table when restored;
+ scan.binlog.newly-added-table.enabled: only do binlog-reading for newly added table during binlog reading phase. + + diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java index 7f7691961a8..5f538eeb1c5 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java @@ -27,6 +27,7 @@ import org.apache.flink.cdc.common.schema.Selectors; import org.apache.flink.cdc.common.source.DataSource; import org.apache.flink.cdc.connectors.mysql.source.MySqlDataSource; +import org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions; import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig; import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory; import org.apache.flink.cdc.connectors.mysql.source.config.ServerIdRange; @@ -38,6 +39,7 @@ import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.catalog.ObjectPath; +import io.debezium.relational.Tables; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,6 +63,7 @@ import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.HOSTNAME; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PASSWORD; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PORT; +import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE; @@ -128,6 +131,8 @@ public DataSource createDataSource(Context context) { int connectMaxRetries = config.get(CONNECT_MAX_RETRIES); int connectionPoolSize = config.get(CONNECTION_POOL_SIZE); boolean scanNewlyAddedTableEnabled = config.get(SCAN_NEWLY_ADDED_TABLE_ENABLED); + boolean scanBinlogNewlyAddedTableEnabled = + config.get(SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED); validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, splitSize, 1); validateIntegerOption(CHUNK_META_GROUP_SIZE, splitMetaGroupSize, 1); @@ -166,26 +171,32 @@ public DataSource createDataSource(Context context) { .jdbcProperties(getJdbcProperties(configMap)) .scanNewlyAddedTableEnabled(scanNewlyAddedTableEnabled); - Selectors selectors = new Selectors.SelectorsBuilder().includeTables(tables).build(); - List capturedTables = getTableList(configFactory.createConfig(0), selectors); - if (capturedTables.isEmpty()) { - throw new IllegalArgumentException( - "Cannot find any table by the option 'tables' = " + tables); - } - if (tablesExclude != null) { - Selectors selectExclude = - new Selectors.SelectorsBuilder().includeTables(tablesExclude).build(); - List excludeTables = getTableList(configFactory.createConfig(0), selectExclude); - if (!excludeTables.isEmpty()) { - capturedTables.removeAll(excludeTables); - } + if (scanBinlogNewlyAddedTableEnabled) { + String newTables = validateTableAndReturnDebeziumStyle(tables); + configFactory.tableList(newTables); + } else { + Selectors selectors = new Selectors.SelectorsBuilder().includeTables(tables).build(); + List capturedTables = getTableList(configFactory.createConfig(0), selectors); if (capturedTables.isEmpty()) { throw new IllegalArgumentException( - "Cannot find any table with by the option 'tables.exclude' = " - + tablesExclude); + "Cannot find any table by the option 'tables' = " + tables); } + if (tablesExclude != null) { + Selectors selectExclude = + new Selectors.SelectorsBuilder().includeTables(tablesExclude).build(); + List excludeTables = + getTableList(configFactory.createConfig(0), selectExclude); + if (!excludeTables.isEmpty()) { + capturedTables.removeAll(excludeTables); + } + if (capturedTables.isEmpty()) { + throw new IllegalArgumentException( + "Cannot find any table with by the option 'tables.exclude' = " + + tablesExclude); + } + } + configFactory.tableList(capturedTables.toArray(new String[0])); } - configFactory.tableList(capturedTables.toArray(new String[0])); String chunkKeyColumns = config.get(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN); if (chunkKeyColumns != null) { @@ -256,6 +267,7 @@ public Set> optionalOptions() { options.add(CHUNK_META_GROUP_SIZE); options.add(CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND); options.add(CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND); + options.add(SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED); return options; } @@ -410,6 +422,33 @@ && doubleCompare(distributionFactorLower, 1.0d) <= 0, distributionFactorLower)); } + /** + * Currently, The supported regular syntax is not exactly the same in {@link Selectors} and + * {@link Tables.TableFilter}. + * + *

The main distinction are : + * + *

1) {@link Selectors} use `,` to split table names and {@link Tables.TableFilter} use use + * `|` to split table names. + * + *

2) If there is a need to use a dot (.) in a regular expression to match any character, it + * is necessary to escape the dot with a backslash, refer to {@link + * MySqlDataSourceOptions#TABLES}. + */ + private String validateTableAndReturnDebeziumStyle(String tables) { + // MySQL table names are not allowed to have `,` character. + if (tables.contains(",")) { + throw new IllegalArgumentException( + "the `,` in " + + tables + + " is not supported when " + + SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED + + " was enabled."); + } + + return tables.replace("\\.", "."); + } + /** Replaces the default timezone placeholder with session timezone, if applicable. */ private static ZoneId getServerTimeZone(Configuration config) { final String serverTimeZone = config.get(SERVER_TIME_ZONE); diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java index 9a18350b348..580d370b5aa 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java @@ -261,4 +261,15 @@ public class MySqlDataSourceOptions { + "If there is a need to use a dot (.) in a regular expression to match any character, " + "it is necessary to escape the dot with a backslash." + "eg. db0.\\.*, db1.user_table_[0-9]+, db[1-2].[app|web]_order_\\.*"); + + @Experimental + public static final ConfigOption SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED = + ConfigOptions.key("scan.binlog.newly-added-table.enabled") + .booleanType() + .defaultValue(false) + .withDescription( + "In binlog reading stage, whether to scan the ddl and dml statements of newly added tables or not, by default is false. \n" + + "The difference between scan.newly-added-table.enabled and scan.binlog.newly-added-table.enabled options is: \n" + + "scan.newly-added-table.enabled: do re-snapshot & binlog-reading for newly added table when restored; \n" + + "scan.binlog.newly-added-table.enabled: only do binlog-reading for newly added table during binlog reading phase."); } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MysqlPipelineNewlyAddedTableITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MysqlPipelineNewlyAddedTableITCase.java index 7187e644735..4cc1e952ab7 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MysqlPipelineNewlyAddedTableITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MysqlPipelineNewlyAddedTableITCase.java @@ -21,6 +21,7 @@ import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.cdc.common.data.binary.BinaryStringData; +import org.apache.flink.cdc.common.event.ChangeEvent; import org.apache.flink.cdc.common.event.CreateTableEvent; import org.apache.flink.cdc.common.event.DataChangeEvent; import org.apache.flink.cdc.common.event.Event; @@ -83,7 +84,10 @@ import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.HOSTNAME; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PASSWORD; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PORT; +import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED; +import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_STARTUP_MODE; +import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_STARTUP_TIMESTAMP_MILLIS; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SERVER_ID; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SERVER_TIME_ZONE; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TABLES; @@ -149,6 +153,40 @@ private MySqlConnection getConnection() { return DebeziumUtils.createMySqlConnection(configuration, new Properties()); } + @Test + public void testScanBinlogNewlyAddedTableEnabled() throws Exception { + List tables = Collections.singletonList("address_\\.*"); + Map options = new HashMap<>(); + options.put(SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED.key(), "true"); + options.put(SCAN_STARTUP_MODE.key(), "timestamp"); + options.put( + SCAN_STARTUP_TIMESTAMP_MILLIS.key(), String.valueOf(System.currentTimeMillis())); + + FlinkSourceProvider sourceProvider = getFlinkSourceProvider(tables, 4, options); + StreamExecutionEnvironment env = + StreamExecutionEnvironment.getExecutionEnvironment(new Configuration()); + env.enableCheckpointing(200); + DataStreamSource source = + env.fromSource( + sourceProvider.getSource(), + WatermarkStrategy.noWatermarks(), + MySqlDataSourceFactory.IDENTIFIER, + new EventTypeInfo()); + + TypeSerializer serializer = + source.getTransformation().getOutputType().createSerializer(env.getConfig()); + CheckpointedCollectResultBuffer resultBuffer = + new CheckpointedCollectResultBuffer<>(serializer); + String accumulatorName = "dataStreamCollect_" + UUID.randomUUID(); + CollectResultIterator iterator = + addCollector(env, source, resultBuffer, serializer, accumulatorName); + env.executeAsync("AddNewlyTablesWhenReadingBinlog"); + initialAddressTables(getConnection(), Collections.singletonList("address_beijing")); + List actual = fetchResults(iterator, 4); + assertThat(((ChangeEvent) actual.get(0)).tableId()) + .isEqualTo(TableId.tableId(customDatabase.getDatabaseName(), "address_beijing")); + } + @Test public void testAddNewTableOneByOneSingleParallelism() throws Exception { TestParam testParam = @@ -228,7 +266,7 @@ private void testAddNewTable(TestParam testParam, int parallelism) throws Except List listenTablesFirstRound = testParam.getFirstRoundListenTables(); FlinkSourceProvider sourceProvider = - getFlinkSourceProvider(listenTablesFirstRound, parallelism); + getFlinkSourceProvider(listenTablesFirstRound, parallelism, new HashMap<>()); DataStreamSource source = env.fromSource( sourceProvider.getSource(), @@ -272,7 +310,7 @@ private void testAddNewTable(TestParam testParam, int parallelism) throws Except getStreamExecutionEnvironment(finishedSavePointPath, parallelism); List listenTablesSecondRound = testParam.getSecondRoundListenTables(); FlinkSourceProvider restoredSourceProvider = - getFlinkSourceProvider(listenTablesSecondRound, parallelism); + getFlinkSourceProvider(listenTablesSecondRound, parallelism, new HashMap<>()); DataStreamSource restoreSource = restoredEnv.fromSource( restoredSourceProvider.getSource(), @@ -432,7 +470,8 @@ private void initialAddressTables(JdbcConnection connection, List addres } } - private FlinkSourceProvider getFlinkSourceProvider(List tables, int parallelism) { + private FlinkSourceProvider getFlinkSourceProvider( + List tables, int parallelism, Map additionalOptions) { List fullTableNames = tables.stream() .map(table -> customDatabase.getDatabaseName() + "." + table) @@ -446,6 +485,7 @@ private FlinkSourceProvider getFlinkSourceProvider(List tables, int para options.put(TABLES.key(), StringUtils.join(fullTableNames, ",")); options.put(SERVER_ID.key(), getServerId(parallelism)); options.put(SCAN_NEWLY_ADDED_TABLE_ENABLED.key(), "true"); + options.putAll(additionalOptions); Factory.Context context = new FactoryHelper.DefaultContext( org.apache.flink.cdc.common.configuration.Configuration.fromMap(options),