From 1dd101d634cc78f9603f477d0ef771cf5f7325d2 Mon Sep 17 00:00:00 2001 From: North Lin <37775475+qg-lin@users.noreply.github.com> Date: Sat, 14 Sep 2024 19:48:42 +0800 Subject: [PATCH 1/9] [FLINK-36282][pipeline-connector][cdc-connector][mysql]fix incorrect data type of TINYINT(1) in mysql pipeline connector --- .../mysql/source/MySqlDataSource.java | 4 +- .../mysql/source/MySqlEventDeserializer.java | 9 +- .../CustomAlterTableParserListener.java | 12 ++- .../parser/CustomMySqlAntlrDdlParser.java | 7 +- .../CustomMySqlAntlrDdlParserListener.java | 9 +- .../reader/MySqlPipelineRecordEmitter.java | 3 +- .../mysql/utils/MySqlSchemaUtils.java | 13 +-- .../mysql/utils/MySqlTypeUtils.java | 17 +++- .../source/MySqlMetadataAccessorITCase.java | 51 ++++++++--- .../mysql/source/MySqlPipelineITCase.java | 86 +++++++++++++++++-- .../debezium/reader/BinlogSplitReader.java | 3 +- .../mysql/schema/MySqlTypeUtils.java | 17 +++- .../source/assigners/MySqlChunkSplitter.java | 10 ++- .../mysql/source/utils/ChunkUtils.java | 11 ++- .../MySqlSnapshotSplitAssignerTest.java | 14 ++- 15 files changed, 202 insertions(+), 64 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSource.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSource.java index d1dc487c04e..bd9133c9d7e 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSource.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSource.java @@ -45,7 +45,9 @@ public MySqlDataSource(MySqlSourceConfigFactory configFactory) { public EventSourceProvider getEventSourceProvider() { MySqlEventDeserializer deserializer = new MySqlEventDeserializer( - DebeziumChangelogMode.ALL, sourceConfig.isIncludeSchemaChanges()); + DebeziumChangelogMode.ALL, + sourceConfig.isIncludeSchemaChanges(), + sourceConfig.getJdbcProperties()); MySqlSource source = new MySqlSource<>( diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlEventDeserializer.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlEventDeserializer.java index 548e603fa5a..210ab98f575 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlEventDeserializer.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlEventDeserializer.java @@ -44,6 +44,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Properties; import static org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils.getHistoryRecord; @@ -59,21 +60,25 @@ public class MySqlEventDeserializer extends DebeziumEventDeserializationSchema { private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private final boolean includeSchemaChanges; + private final Properties jdbcProperties; private transient Tables tables; private transient CustomMySqlAntlrDdlParser customParser; public MySqlEventDeserializer( - DebeziumChangelogMode changelogMode, boolean includeSchemaChanges) { + DebeziumChangelogMode changelogMode, + boolean includeSchemaChanges, + Properties jdbcProperties) { super(new MySqlSchemaDataTypeInference(), changelogMode); this.includeSchemaChanges = includeSchemaChanges; + this.jdbcProperties = jdbcProperties; } @Override protected List deserializeSchemaChangeRecord(SourceRecord record) { if (includeSchemaChanges) { if (customParser == null) { - customParser = new CustomMySqlAntlrDdlParser(); + customParser = new CustomMySqlAntlrDdlParser(jdbcProperties); tables = new Tables(); } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java index 3b30b3c4940..e8a3cf06841 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java @@ -46,6 +46,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Properties; import java.util.stream.Collectors; import static org.apache.flink.cdc.connectors.mysql.utils.MySqlTypeUtils.fromDbzColumn; @@ -60,6 +61,7 @@ public class CustomAlterTableParserListener extends MySqlParserBaseListener { private final MySqlAntlrDdlParser parser; private final List listeners; private final LinkedList changes; + private final Properties jdbcProperties; private org.apache.flink.cdc.common.event.TableId currentTable; private List columnEditors; private CustomColumnDefinitionParserListener columnDefinitionListener; @@ -70,10 +72,12 @@ public class CustomAlterTableParserListener extends MySqlParserBaseListener { public CustomAlterTableParserListener( MySqlAntlrDdlParser parser, List listeners, - LinkedList changes) { + LinkedList changes, + Properties jdbcProperties) { this.parser = parser; this.listeners = listeners; this.changes = changes; + this.jdbcProperties = jdbcProperties; } @Override @@ -315,7 +319,7 @@ public void exitAlterByChangeColumn(MySqlParser.AlterByChangeColumnContext ctx) String newColumnName = parser.parseName(ctx.newColumn); Map typeMapping = new HashMap<>(); - typeMapping.put(column.name(), fromDbzColumn(column)); + typeMapping.put(column.name(), fromDbzColumn(column, jdbcProperties)); changes.add(new AlterColumnTypeEvent(currentTable, typeMapping)); if (newColumnName != null && !column.name().equalsIgnoreCase(newColumnName)) { @@ -366,7 +370,7 @@ public void exitAlterByModifyColumn(MySqlParser.AlterByModifyColumnContext ctx) () -> { Column column = columnDefinitionListener.getColumn(); Map typeMapping = new HashMap<>(); - typeMapping.put(column.name(), fromDbzColumn(column)); + typeMapping.put(column.name(), fromDbzColumn(column, jdbcProperties)); changes.add(new AlterColumnTypeEvent(currentTable, typeMapping)); listeners.remove(columnDefinitionListener); }, @@ -413,7 +417,7 @@ public void exitDropTable(MySqlParser.DropTableContext ctx) { private org.apache.flink.cdc.common.schema.Column toCdcColumn(Column dbzColumn) { return org.apache.flink.cdc.common.schema.Column.physicalColumn( dbzColumn.name(), - fromDbzColumn(dbzColumn), + fromDbzColumn(dbzColumn, jdbcProperties), dbzColumn.comment(), dbzColumn.defaultValueExpression().orElse(null)); } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomMySqlAntlrDdlParser.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomMySqlAntlrDdlParser.java index 1264aa8d683..2456c93f18e 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomMySqlAntlrDdlParser.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomMySqlAntlrDdlParser.java @@ -29,15 +29,18 @@ import java.util.Arrays; import java.util.LinkedList; import java.util.List; +import java.util.Properties; /** A ddl parser that will use custom listener. */ public class CustomMySqlAntlrDdlParser extends MySqlAntlrDdlParser { private final LinkedList parsedEvents; + private final Properties jdbcProperties; - public CustomMySqlAntlrDdlParser() { + public CustomMySqlAntlrDdlParser(Properties jdbcProperties) { super(); this.parsedEvents = new LinkedList<>(); + this.jdbcProperties = jdbcProperties; } // Overriding this method because the BIT type requires default length dimension of 1. @@ -277,7 +280,7 @@ protected DataTypeResolver initializeDataTypeResolver() { @Override protected AntlrDdlParserListener createParseTreeWalkerListener() { - return new CustomMySqlAntlrDdlParserListener(this, parsedEvents); + return new CustomMySqlAntlrDdlParserListener(this, parsedEvents, jdbcProperties); } public List getAndClearParsedEvents() { diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomMySqlAntlrDdlParserListener.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomMySqlAntlrDdlParserListener.java index 445373309fa..c9400b3f11f 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomMySqlAntlrDdlParserListener.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomMySqlAntlrDdlParserListener.java @@ -47,6 +47,7 @@ import java.util.Collection; import java.util.LinkedList; import java.util.List; +import java.util.Properties; import java.util.concurrent.CopyOnWriteArrayList; /** @@ -74,12 +75,16 @@ public class CustomMySqlAntlrDdlParserListener extends MySqlParserBaseListener private final Collection errors = new ArrayList<>(); public CustomMySqlAntlrDdlParserListener( - MySqlAntlrDdlParser parser, LinkedList parsedEvents) { + MySqlAntlrDdlParser parser, + LinkedList parsedEvents, + Properties jdbcProperties) { // initialize listeners listeners.add(new CreateAndAlterDatabaseParserListener(parser)); listeners.add(new DropDatabaseParserListener(parser)); listeners.add(new CreateTableParserListener(parser, listeners)); - listeners.add(new CustomAlterTableParserListener(parser, listeners, parsedEvents)); + listeners.add( + new CustomAlterTableParserListener( + parser, listeners, parsedEvents, jdbcProperties)); listeners.add(new DropTableParserListener(parser)); listeners.add(new RenameTableParserListener(parser)); listeners.add(new TruncateTableParserListener(parser)); diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java index 4f801e0fa15..8602ae6d5ac 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java @@ -201,7 +201,8 @@ private Schema parseDDL(String ddlStatement, TableId tableId) { Column column = columns.get(i); String colName = column.name(); - DataType dataType = MySqlTypeUtils.fromDbzColumn(column); + DataType dataType = + MySqlTypeUtils.fromDbzColumn(column, sourceConfig.getJdbcProperties()); if (!column.isOptional()) { dataType = dataType.notNull(); } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/utils/MySqlSchemaUtils.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/utils/MySqlSchemaUtils.java index bc4135cd1fb..7cdfdaa433a 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/utils/MySqlSchemaUtils.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/utils/MySqlSchemaUtils.java @@ -37,6 +37,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Properties; import java.util.stream.Collectors; import static org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils.createMySqlConnection; @@ -129,14 +130,14 @@ public static Schema getTableSchema( new MySqlSchema(sourceConfig, jdbc.isTableIdCaseSensitive())) { TableChanges.TableChange tableSchema = mySqlSchema.getTableSchema(partition, jdbc, toDbzTableId(tableId)); - return toSchema(tableSchema.getTable()); + return toSchema(tableSchema.getTable(), sourceConfig.getJdbcProperties()); } } - public static Schema toSchema(Table table) { + public static Schema toSchema(Table table, Properties jdbcProperties) { List columns = table.columns().stream() - .map(MySqlSchemaUtils::toColumn) + .map(column -> toColumn(column, jdbcProperties)) .collect(Collectors.toList()); return Schema.newBuilder() @@ -146,9 +147,11 @@ public static Schema toSchema(Table table) { .build(); } - public static Column toColumn(io.debezium.relational.Column column) { + public static Column toColumn(io.debezium.relational.Column column, Properties jdbcProperties) { return Column.physicalColumn( - column.name(), MySqlTypeUtils.fromDbzColumn(column), column.comment()); + column.name(), + MySqlTypeUtils.fromDbzColumn(column, jdbcProperties), + column.comment()); } public static io.debezium.relational.TableId toDbzTableId(TableId tableId) { diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/utils/MySqlTypeUtils.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/utils/MySqlTypeUtils.java index 0e70ed6d91f..e3b9a5c0bd9 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/utils/MySqlTypeUtils.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/utils/MySqlTypeUtils.java @@ -22,8 +22,11 @@ import org.apache.flink.cdc.common.types.DataType; import org.apache.flink.cdc.common.types.DataTypes; +import com.mysql.cj.conf.PropertyKey; import io.debezium.relational.Column; +import java.util.Properties; + /** Utilities for converting from MySQL types to {@link DataType}s. */ public class MySqlTypeUtils { @@ -109,8 +112,8 @@ public class MySqlTypeUtils { private static final String UNKNOWN = "UNKNOWN"; /** Returns a corresponding Flink data type from a debezium {@link Column}. */ - public static DataType fromDbzColumn(Column column) { - DataType dataType = convertFromColumn(column); + public static DataType fromDbzColumn(Column column, Properties jdbcProperties) { + DataType dataType = convertFromColumn(column, jdbcProperties); if (column.isOptional()) { return dataType; } else { @@ -122,7 +125,7 @@ public static DataType fromDbzColumn(Column column) { * Returns a corresponding Flink data type from a debezium {@link Column} with nullable always * be true. */ - private static DataType convertFromColumn(Column column) { + private static DataType convertFromColumn(Column column, Properties jdbcProperties) { String typeName = column.typeName(); switch (typeName) { case BIT: @@ -137,7 +140,13 @@ private static DataType convertFromColumn(Column column) { // user should not use tinyint(1) to store number although jdbc url parameter // tinyInt1isBit=false can help change the return value, it's not a general way // btw: mybatis and mysql-connector-java map tinyint(1) to boolean by default - return column.length() == 1 ? DataTypes.BOOLEAN() : DataTypes.TINYINT(); + boolean tinyInt1isBit = + Boolean.parseBoolean( + jdbcProperties.getProperty( + PropertyKey.tinyInt1isBit.getKeyName(), "true")); + return (column.length() == 1 && tinyInt1isBit) + ? DataTypes.BOOLEAN() + : DataTypes.TINYINT(); case TINYINT_UNSIGNED: case TINYINT_UNSIGNED_ZEROFILL: case SMALLINT: diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlMetadataAccessorITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlMetadataAccessorITCase.java index 3d3f0276b69..1f26930808c 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlMetadataAccessorITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlMetadataAccessorITCase.java @@ -32,6 +32,7 @@ import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import com.mysql.cj.conf.PropertyKey; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; @@ -41,6 +42,7 @@ import java.time.ZoneId; import java.util.Arrays; import java.util.List; +import java.util.Properties; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -102,13 +104,23 @@ public void testMysql8AccessDatabaseAndTable() { } @Test - public void testMysql57AccessCommonTypesSchema() { - testAccessCommonTypesSchema(fullTypesMySql57Database); + public void testMysql57AccessCommonTypesSchemaTinyintIsBit() { + testAccessCommonTypesSchema(fullTypesMySql57Database, true); } @Test - public void testMysql8AccessCommonTypesSchema() { - testAccessCommonTypesSchema(fullTypesMySql8Database); + public void testMysql57AccessCommonTypesSchemaTinyintIsNotBit() { + testAccessCommonTypesSchema(fullTypesMySql57Database, false); + } + + @Test + public void testMysql8AccessCommonTypesSchemaTinyintIsBit() { + testAccessCommonTypesSchema(fullTypesMySql8Database, true); + } + + @Test + public void testMysql8AccessCommonTypesSchemaTinyintIsNotBit() { + testAccessCommonTypesSchema(fullTypesMySql8Database, false); } @Test @@ -117,7 +129,7 @@ public void testMysql57AccessTimeTypesSchema() { String[] tables = new String[] {"time_types"}; MySqlMetadataAccessor metadataAccessor = - getMetadataAccessor(tables, fullTypesMySql57Database); + getMetadataAccessor(tables, fullTypesMySql57Database, true); Schema actualSchema = metadataAccessor.getTableSchema( @@ -163,7 +175,7 @@ public void testMysql8AccessTimeTypesSchema() { String[] tables = new String[] {"time_types"}; MySqlMetadataAccessor metadataAccessor = - getMetadataAccessor(tables, fullTypesMySql8Database); + getMetadataAccessor(tables, fullTypesMySql8Database, true); Schema actualSchema = metadataAccessor.getTableSchema( @@ -211,7 +223,7 @@ private void testAccessDatabaseAndTable(UniqueDatabase database) { database.createAndInitialize(); String[] tables = new String[] {"common_types", "time_types", "precision_types"}; - MySqlMetadataAccessor metadataAccessor = getMetadataAccessor(tables, database); + MySqlMetadataAccessor metadataAccessor = getMetadataAccessor(tables, database, true); assertThatThrownBy(metadataAccessor::listNamespaces) .isInstanceOf(UnsupportedOperationException.class); @@ -227,11 +239,12 @@ private void testAccessDatabaseAndTable(UniqueDatabase database) { assertThat(actualTables).containsExactlyInAnyOrderElementsOf(expectedTables); } - private void testAccessCommonTypesSchema(UniqueDatabase database) { + private void testAccessCommonTypesSchema(UniqueDatabase database, boolean tinyint1IsBit) { database.createAndInitialize(); String[] tables = new String[] {"common_types"}; - MySqlMetadataAccessor metadataAccessor = getMetadataAccessor(tables, database); + MySqlMetadataAccessor metadataAccessor = + getMetadataAccessor(tables, database, tinyint1IsBit); Schema actualSchema = metadataAccessor.getTableSchema( @@ -277,8 +290,12 @@ private void testAccessCommonTypesSchema(UniqueDatabase database) { DataTypes.STRING(), DataTypes.BOOLEAN(), DataTypes.BINARY(1), - DataTypes.BOOLEAN(), - DataTypes.BOOLEAN(), + tinyint1IsBit + ? DataTypes.BOOLEAN() + : DataTypes.TINYINT(), + tinyint1IsBit + ? DataTypes.BOOLEAN() + : DataTypes.TINYINT(), DataTypes.BINARY(16), DataTypes.BINARY(8), DataTypes.STRING(), @@ -357,17 +374,22 @@ private void testAccessCommonTypesSchema(UniqueDatabase database) { assertThat(actualSchema).isEqualTo(expectedSchema); } - private MySqlMetadataAccessor getMetadataAccessor(String[] tables, UniqueDatabase database) { - MySqlSourceConfig sourceConfig = getConfig(tables, database); + private MySqlMetadataAccessor getMetadataAccessor( + String[] tables, UniqueDatabase database, boolean tinyint1IsBit) { + MySqlSourceConfig sourceConfig = getConfig(tables, database, tinyint1IsBit); return new MySqlMetadataAccessor(sourceConfig); } - private MySqlSourceConfig getConfig(String[] captureTables, UniqueDatabase database) { + private MySqlSourceConfig getConfig( + String[] captureTables, UniqueDatabase database, boolean tinyint1IsBit) { String[] captureTableIds = Arrays.stream(captureTables) .map(tableName -> database.getDatabaseName() + "." + tableName) .toArray(String[]::new); + Properties jdbcProperties = new Properties(); + jdbcProperties.put(PropertyKey.tinyInt1isBit.getKeyName(), String.valueOf(tinyint1IsBit)); + return new MySqlSourceConfigFactory() .startupOptions(StartupOptions.latest()) .databaseList(database.getDatabaseName()) @@ -380,6 +402,7 @@ private MySqlSourceConfig getConfig(String[] captureTables, UniqueDatabase datab .username(database.getUsername()) .password(database.getPassword()) .serverTimeZone(ZoneId.of("UTC").toString()) + .jdbcProperties(jdbcProperties) .createConfig(0); } } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java index f2b4ccb82f7..00ed374b9d1 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java @@ -51,6 +51,7 @@ import org.apache.flink.table.planner.factories.TestValuesTableFactory; import org.apache.flink.util.CloseableIterator; +import com.mysql.cj.conf.PropertyKey; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; @@ -61,11 +62,7 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; +import java.util.*; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -259,9 +256,22 @@ private static List fetchResultsExcept(Iterator iter, int size, T side } @Test - public void testParseAlterStatement() throws Exception { + public void testParseAlterStatementTinyintIsBit() throws Exception { + testParseAlterStatement(true); + } + + @Test + public void testParseAlterStatementTinyint1IsNotBit() throws Exception { + testParseAlterStatement(false); + } + + public void testParseAlterStatement(boolean tinyint1IsBit) throws Exception { env.setParallelism(1); inventoryDatabase.createAndInitialize(); + + Properties jdbcProperties = new Properties(); + jdbcProperties.put(PropertyKey.tinyInt1isBit.getKeyName(), String.valueOf(tinyint1IsBit)); + MySqlSourceConfigFactory configFactory = new MySqlSourceConfigFactory() .hostname(MYSQL8_CONTAINER.getHost()) @@ -273,6 +283,7 @@ public void testParseAlterStatement() throws Exception { .startupOptions(StartupOptions.latest()) .serverId(getServerId(env.getParallelism())) .serverTimeZone("UTC") + .jdbcProperties(jdbcProperties) .includeSchemaChanges(SCHEMA_CHANGE_ENABLED.defaultValue()); FlinkSourceProvider sourceProvider = @@ -373,6 +384,21 @@ public void testParseAlterStatement() throws Exception { new AddColumnEvent.ColumnWithPosition( Column.physicalColumn("cols9", DataTypes.CHAR(1)))))); + statement.execute( + String.format( + "ALTER TABLE `%s`.`products` ADD COLUMN `cols10` TINYINT(1) NULL;", + inventoryDatabase.getDatabaseName())); + expected.add( + new AddColumnEvent( + tableId, + Collections.singletonList( + new AddColumnEvent.ColumnWithPosition( + Column.physicalColumn( + "cols10", + tinyint1IsBit + ? DataTypes.BOOLEAN() + : DataTypes.TINYINT()))))); + // Drop orders table first to remove foreign key restraints statement.execute( String.format( @@ -394,9 +420,22 @@ public void testParseAlterStatement() throws Exception { } @Test - public void testSchemaChangeEvents() throws Exception { + public void testSchemaChangeEventsTinyint1IsBit() throws Exception { + testSchemaChangeEvents(true); + } + + @Test + public void testSchemaChangeEventsTinyint1IsNotBit() throws Exception { + testSchemaChangeEvents(false); + } + + public void testSchemaChangeEvents(boolean tinyint1IsBit) throws Exception { env.setParallelism(1); inventoryDatabase.createAndInitialize(); + + Properties jdbcProperties = new Properties(); + jdbcProperties.put(PropertyKey.tinyInt1isBit.getKeyName(), String.valueOf(tinyint1IsBit)); + MySqlSourceConfigFactory configFactory = new MySqlSourceConfigFactory() .hostname(MYSQL8_CONTAINER.getHost()) @@ -408,6 +447,7 @@ public void testSchemaChangeEvents() throws Exception { .startupOptions(StartupOptions.latest()) .serverId(getServerId(env.getParallelism())) .serverTimeZone("UTC") + .jdbcProperties(jdbcProperties) .includeSchemaChanges(SCHEMA_CHANGE_ENABLED.defaultValue()); FlinkSourceProvider sourceProvider = @@ -439,6 +479,22 @@ public void testSchemaChangeEvents() throws Exception { new AddColumnEvent.ColumnWithPosition( Column.physicalColumn("newcol1", DataTypes.INT()))))); + // Add a TINYINT(1) column + statement.execute( + String.format( + "ALTER TABLE `%s`.`customers` ADD COLUMN `new_tinyint1_col1` TINYINT(1) NULL;", + inventoryDatabase.getDatabaseName())); + expected.add( + new AddColumnEvent( + TableId.tableId(inventoryDatabase.getDatabaseName(), "customers"), + Collections.singletonList( + new AddColumnEvent.ColumnWithPosition( + Column.physicalColumn( + "new_tinyint1_col1", + tinyint1IsBit + ? DataTypes.BOOLEAN() + : DataTypes.TINYINT()))))); + // Test MODIFY COLUMN DDL statement.execute( String.format( @@ -450,6 +506,16 @@ public void testSchemaChangeEvents() throws Exception { TableId.tableId(inventoryDatabase.getDatabaseName(), "customers"), Collections.singletonMap("newcol1", DataTypes.DOUBLE()))); + statement.execute( + String.format( + "ALTER TABLE `%s`.`customers` MODIFY COLUMN `new_tinyint1_col1` INT;", + inventoryDatabase.getDatabaseName())); + + expected.add( + new AlterColumnTypeEvent( + TableId.tableId(inventoryDatabase.getDatabaseName(), "customers"), + Collections.singletonMap("new_tinyint1_col1", DataTypes.INT()))); + // Test CHANGE COLUMN DDL statement.execute( String.format( @@ -615,7 +681,11 @@ public void testSchemaChangeEvents() throws Exception { .physicalColumn("big_decimal_c", DataTypes.STRING()) .physicalColumn("bit1_c", DataTypes.BOOLEAN()) .physicalColumn("bit3_c", DataTypes.BINARY(1)) - .physicalColumn("tiny1_c", DataTypes.BOOLEAN()) + .physicalColumn( + "tiny1_c", + tinyint1IsBit + ? DataTypes.BOOLEAN() + : DataTypes.TINYINT()) .physicalColumn("boolean_c", DataTypes.BOOLEAN()) .physicalColumn("file_uuid", DataTypes.BINARY(16)) .physicalColumn("bit_c", DataTypes.BINARY(8)) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java index 31173469b4f..bbc8dce30db 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java @@ -226,7 +226,8 @@ private boolean shouldEmit(SourceRecord sourceRecord) { RowType splitKeyType = ChunkUtils.getChunkKeyColumnType( statefulTaskContext.getDatabaseSchema().tableFor(tableId), - statefulTaskContext.getSourceConfig().getChunkKeyColumns()); + statefulTaskContext.getSourceConfig().getChunkKeyColumns(), + statefulTaskContext.getSourceConfig().getJdbcProperties()); Struct target = RecordUtils.getStructContainsChunkKey(sourceRecord); Object[] chunkKey = diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/schema/MySqlTypeUtils.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/schema/MySqlTypeUtils.java index 57763285251..e14bfc988ca 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/schema/MySqlTypeUtils.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/schema/MySqlTypeUtils.java @@ -20,8 +20,11 @@ import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.types.DataType; +import com.mysql.cj.conf.PropertyKey; import io.debezium.relational.Column; +import java.util.Properties; + /** Utilities for converting from MySQL types to Flink types. */ public class MySqlTypeUtils { @@ -107,8 +110,8 @@ public class MySqlTypeUtils { private static final String UNKNOWN = "UNKNOWN"; /** Returns a corresponding Flink data type from a debezium {@link Column}. */ - public static DataType fromDbzColumn(Column column) { - DataType dataType = convertFromColumn(column); + public static DataType fromDbzColumn(Column column, Properties jdbcProperties) { + DataType dataType = convertFromColumn(column, jdbcProperties); if (column.isOptional()) { return dataType; } else { @@ -120,7 +123,7 @@ public static DataType fromDbzColumn(Column column) { * Returns a corresponding Flink data type from a debezium {@link Column} with nullable always * be true. */ - private static DataType convertFromColumn(Column column) { + private static DataType convertFromColumn(Column column, Properties jdbcProperties) { String typeName = column.typeName(); switch (typeName) { case BIT: @@ -135,7 +138,13 @@ private static DataType convertFromColumn(Column column) { // user should not use tinyint(1) to store number although jdbc url parameter // tinyInt1isBit=false can help change the return value, it's not a general way // btw: mybatis and mysql-connector-java map tinyint(1) to boolean by default - return column.length() == 1 ? DataTypes.BOOLEAN() : DataTypes.TINYINT(); + boolean tinyInt1isBit = + Boolean.parseBoolean( + jdbcProperties.getProperty( + PropertyKey.tinyInt1isBit.getKeyName(), "true")); + return (column.length() == 1 && tinyInt1isBit) + ? DataTypes.BOOLEAN() + : DataTypes.TINYINT(); case TINYINT_UNSIGNED: case TINYINT_UNSIGNED_ZEROFILL: case SMALLINT: diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitter.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitter.java index ec0bab77fe7..101ecc116bb 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitter.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitter.java @@ -52,6 +52,7 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.Properties; import static java.math.BigDecimal.ROUND_CEILING; @@ -148,7 +149,8 @@ private void analyzeTable(MySqlPartition partition, TableId tableId) { splitColumn = ChunkUtils.getChunkKeyColumn( currentSplittingTable, sourceConfig.getChunkKeyColumns()); - splitType = ChunkUtils.getChunkKeyColumnType(splitColumn); + splitType = + ChunkUtils.getChunkKeyColumnType(splitColumn, sourceConfig.getJdbcProperties()); minMaxOfSplitColumn = StatementUtils.queryMinMax(jdbcConnection, tableId, splitColumn.name()); approximateRowCnt = StatementUtils.queryApproximateRowCnt(jdbcConnection, tableId); @@ -391,7 +393,7 @@ private int getDynamicChunkSize( Object max, int chunkSize, long approximateRowCnt) { - if (!isEvenlySplitColumn(splitColumn)) { + if (!isEvenlySplitColumn(splitColumn, sourceConfig.getJdbcProperties())) { return -1; } final double distributionFactorUpper = sourceConfig.getDistributionFactorUpper(); @@ -416,8 +418,8 @@ private int getDynamicChunkSize( } /** Checks whether split column is evenly distributed across its range. */ - private static boolean isEvenlySplitColumn(Column splitColumn) { - DataType flinkType = MySqlTypeUtils.fromDbzColumn(splitColumn); + private static boolean isEvenlySplitColumn(Column splitColumn, Properties jdbcProperties) { + DataType flinkType = MySqlTypeUtils.fromDbzColumn(splitColumn, jdbcProperties); LogicalTypeRoot typeRoot = flinkType.getLogicalType().getTypeRoot(); // currently, we only support the optimization that split column with type BIGINT, INT, diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/ChunkUtils.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/ChunkUtils.java index 49325609988..a58f5139488 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/ChunkUtils.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/ChunkUtils.java @@ -34,6 +34,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Properties; import java.util.stream.Collectors; import static org.apache.flink.table.api.DataTypes.FIELD; @@ -45,13 +46,15 @@ public class ChunkUtils { private ChunkUtils() {} public static RowType getChunkKeyColumnType( - Table table, Map chunkKeyColumns) { - return getChunkKeyColumnType(getChunkKeyColumn(table, chunkKeyColumns)); + Table table, Map chunkKeyColumns, Properties jdbcProperties) { + return getChunkKeyColumnType(getChunkKeyColumn(table, chunkKeyColumns), jdbcProperties); } - public static RowType getChunkKeyColumnType(Column chunkKeyColumn) { + public static RowType getChunkKeyColumnType(Column chunkKeyColumn, Properties jdbcProperties) { return (RowType) - ROW(FIELD(chunkKeyColumn.name(), MySqlTypeUtils.fromDbzColumn(chunkKeyColumn))) + ROW(FIELD( + chunkKeyColumn.name(), + MySqlTypeUtils.fromDbzColumn(chunkKeyColumn, jdbcProperties))) .getLogicalType(); } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java index de875a0ed75..48459191435 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java @@ -32,19 +32,14 @@ import org.apache.flink.table.types.logical.RowType; import org.apache.flink.util.ExceptionUtils; +import com.mysql.cj.conf.PropertyKey; import io.debezium.relational.Column; import io.debezium.relational.TableId; import org.junit.BeforeClass; import org.junit.Test; import java.time.ZoneId; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; +import java.util.*; import java.util.stream.Collectors; import static org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND; @@ -574,9 +569,12 @@ private List getTestAssignSnapshotSplitsFromCheckpoint(AssignerStatus as List alreadyProcessedTables = new ArrayList<>(); alreadyProcessedTables.add(processedTable); + Properties jdbcProperties = new Properties(); + jdbcProperties.put(PropertyKey.tinyInt1isBit.getKeyName(), "true"); RowType splitKeyType = ChunkUtils.getChunkKeyColumnType( - Column.editor().name("id").type("INT").jdbcType(4).create()); + Column.editor().name("id").type("INT").jdbcType(4).create(), + jdbcProperties); List remainingSplits = Arrays.asList( new MySqlSchemalessSnapshotSplit( From cb4e212df44fa08b1b6be20e689a54682e52067a Mon Sep 17 00:00:00 2001 From: North Lin <37775475+qg-lin@users.noreply.github.com> Date: Sun, 15 Sep 2024 22:18:31 +0800 Subject: [PATCH 2/9] reformat code --- .../source/assigners/MySqlSnapshotSplitAssignerTest.java | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java index 48459191435..47b7f93143d 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java @@ -39,7 +39,14 @@ import org.junit.Test; import java.time.ZoneId; -import java.util.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; import java.util.stream.Collectors; import static org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND; From 10b3e8f30a2ccb5efd74d4e9765a202056e18680 Mon Sep 17 00:00:00 2001 From: North Lin <37775475+qg-lin@users.noreply.github.com> Date: Thu, 19 Sep 2024 12:32:42 +0800 Subject: [PATCH 3/9] Update MySqlPipelineITCase.java --- .../cdc/connectors/mysql/source/MySqlPipelineITCase.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java index 00ed374b9d1..86a87031bff 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java @@ -62,7 +62,12 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; -import java.util.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Properties; import java.util.stream.Collectors; import java.util.stream.Stream; From 27ba6ddc0803aeec800380cb24260171b5c9cd1f Mon Sep 17 00:00:00 2001 From: "north.lin" <37775475+qg-lin@users.noreply.github.com> Date: Sat, 11 Jan 2025 21:29:14 +0800 Subject: [PATCH 4/9] pass a boolean value instead of Properties --- .../mysql/source/MySqlDataSource.java | 9 +++++++- .../mysql/source/MySqlEventDeserializer.java | 15 ++++++------- .../CustomAlterTableParserListener.java | 13 ++++++------ .../parser/CustomMySqlAntlrDdlParser.java | 9 ++++---- .../CustomMySqlAntlrDdlParserListener.java | 6 ++---- .../reader/MySqlPipelineRecordEmitter.java | 9 ++++++-- .../mysql/utils/MySqlSchemaUtils.java | 17 +++++++++------ .../mysql/utils/MySqlTypeUtils.java | 13 +++--------- .../source/MySqlMetadataAccessorITCase.java | 12 +++++------ .../mysql/source/MySqlPipelineITCase.java | 17 ++++++++------- .../debezium/reader/BinlogSplitReader.java | 10 ++++++++- .../mysql/schema/MySqlTypeUtils.java | 13 +++--------- .../source/assigners/MySqlChunkSplitter.java | 21 +++++++++++++------ .../mysql/source/utils/ChunkUtils.java | 9 ++++---- .../MySqlSnapshotSplitAssignerTest.java | 7 +------ 15 files changed, 94 insertions(+), 86 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSource.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSource.java index da04d5eb47c..ec87e71ece5 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSource.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSource.java @@ -31,6 +31,8 @@ import org.apache.flink.cdc.connectors.mysql.table.MySqlReadableMetadata; import org.apache.flink.cdc.debezium.table.DebeziumChangelogMode; +import com.mysql.cj.conf.PropertyKey; + import java.util.ArrayList; import java.util.List; @@ -57,12 +59,17 @@ public MySqlDataSource( @Override public EventSourceProvider getEventSourceProvider() { + boolean tinyInt1isBit = + Boolean.parseBoolean( + sourceConfig + .getJdbcProperties() + .getProperty(PropertyKey.tinyInt1isBit.getKeyName(), "true")); MySqlEventDeserializer deserializer = new MySqlEventDeserializer( DebeziumChangelogMode.ALL, sourceConfig.isIncludeSchemaChanges(), readableMetadataList, - sourceConfig.getJdbcProperties()); + tinyInt1isBit); MySqlSource source = new MySqlSource<>( diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlEventDeserializer.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlEventDeserializer.java index f0ed6aa9324..ab32cf998dd 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlEventDeserializer.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlEventDeserializer.java @@ -47,7 +47,6 @@ import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.Properties; import static org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils.getHistoryRecord; @@ -63,7 +62,7 @@ public class MySqlEventDeserializer extends DebeziumEventDeserializationSchema { private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private final boolean includeSchemaChanges; - private final Properties jdbcProperties; + private final boolean tinyInt1isBit; private transient Tables tables; private transient CustomMySqlAntlrDdlParser customParser; @@ -71,28 +70,26 @@ public class MySqlEventDeserializer extends DebeziumEventDeserializationSchema { private List readableMetadataList; public MySqlEventDeserializer( - DebeziumChangelogMode changelogMode, - boolean includeSchemaChanges, - Properties jdbcProperties) { - this(changelogMode, includeSchemaChanges, new ArrayList<>(), jdbcProperties); + DebeziumChangelogMode changelogMode, boolean includeSchemaChanges) { + this(changelogMode, includeSchemaChanges, new ArrayList<>(), true); } public MySqlEventDeserializer( DebeziumChangelogMode changelogMode, boolean includeSchemaChanges, List readableMetadataList, - Properties jdbcProperties) { + boolean tinyInt1isBit) { super(new MySqlSchemaDataTypeInference(), changelogMode); this.includeSchemaChanges = includeSchemaChanges; this.readableMetadataList = readableMetadataList; - this.jdbcProperties = jdbcProperties; + this.tinyInt1isBit = tinyInt1isBit; } @Override protected List deserializeSchemaChangeRecord(SourceRecord record) { if (includeSchemaChanges) { if (customParser == null) { - customParser = new CustomMySqlAntlrDdlParser(jdbcProperties); + customParser = new CustomMySqlAntlrDdlParser(tinyInt1isBit); tables = new Tables(); } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java index e8a3cf06841..31ee183e53c 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java @@ -46,7 +46,6 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Properties; import java.util.stream.Collectors; import static org.apache.flink.cdc.connectors.mysql.utils.MySqlTypeUtils.fromDbzColumn; @@ -61,7 +60,7 @@ public class CustomAlterTableParserListener extends MySqlParserBaseListener { private final MySqlAntlrDdlParser parser; private final List listeners; private final LinkedList changes; - private final Properties jdbcProperties; + private final boolean tinyInt1isBit; private org.apache.flink.cdc.common.event.TableId currentTable; private List columnEditors; private CustomColumnDefinitionParserListener columnDefinitionListener; @@ -73,11 +72,11 @@ public CustomAlterTableParserListener( MySqlAntlrDdlParser parser, List listeners, LinkedList changes, - Properties jdbcProperties) { + boolean tinyInt1isBit) { this.parser = parser; this.listeners = listeners; this.changes = changes; - this.jdbcProperties = jdbcProperties; + this.tinyInt1isBit = tinyInt1isBit; } @Override @@ -319,7 +318,7 @@ public void exitAlterByChangeColumn(MySqlParser.AlterByChangeColumnContext ctx) String newColumnName = parser.parseName(ctx.newColumn); Map typeMapping = new HashMap<>(); - typeMapping.put(column.name(), fromDbzColumn(column, jdbcProperties)); + typeMapping.put(column.name(), fromDbzColumn(column, tinyInt1isBit)); changes.add(new AlterColumnTypeEvent(currentTable, typeMapping)); if (newColumnName != null && !column.name().equalsIgnoreCase(newColumnName)) { @@ -370,7 +369,7 @@ public void exitAlterByModifyColumn(MySqlParser.AlterByModifyColumnContext ctx) () -> { Column column = columnDefinitionListener.getColumn(); Map typeMapping = new HashMap<>(); - typeMapping.put(column.name(), fromDbzColumn(column, jdbcProperties)); + typeMapping.put(column.name(), fromDbzColumn(column, tinyInt1isBit)); changes.add(new AlterColumnTypeEvent(currentTable, typeMapping)); listeners.remove(columnDefinitionListener); }, @@ -417,7 +416,7 @@ public void exitDropTable(MySqlParser.DropTableContext ctx) { private org.apache.flink.cdc.common.schema.Column toCdcColumn(Column dbzColumn) { return org.apache.flink.cdc.common.schema.Column.physicalColumn( dbzColumn.name(), - fromDbzColumn(dbzColumn, jdbcProperties), + fromDbzColumn(dbzColumn, tinyInt1isBit), dbzColumn.comment(), dbzColumn.defaultValueExpression().orElse(null)); } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomMySqlAntlrDdlParser.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomMySqlAntlrDdlParser.java index 2456c93f18e..ed4fe7c978e 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomMySqlAntlrDdlParser.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomMySqlAntlrDdlParser.java @@ -29,18 +29,17 @@ import java.util.Arrays; import java.util.LinkedList; import java.util.List; -import java.util.Properties; /** A ddl parser that will use custom listener. */ public class CustomMySqlAntlrDdlParser extends MySqlAntlrDdlParser { private final LinkedList parsedEvents; - private final Properties jdbcProperties; + private final boolean tinyInt1isBit; - public CustomMySqlAntlrDdlParser(Properties jdbcProperties) { + public CustomMySqlAntlrDdlParser(boolean tinyInt1isBit) { super(); this.parsedEvents = new LinkedList<>(); - this.jdbcProperties = jdbcProperties; + this.tinyInt1isBit = tinyInt1isBit; } // Overriding this method because the BIT type requires default length dimension of 1. @@ -280,7 +279,7 @@ protected DataTypeResolver initializeDataTypeResolver() { @Override protected AntlrDdlParserListener createParseTreeWalkerListener() { - return new CustomMySqlAntlrDdlParserListener(this, parsedEvents, jdbcProperties); + return new CustomMySqlAntlrDdlParserListener(this, parsedEvents, tinyInt1isBit); } public List getAndClearParsedEvents() { diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomMySqlAntlrDdlParserListener.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomMySqlAntlrDdlParserListener.java index c9400b3f11f..7e24e264759 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomMySqlAntlrDdlParserListener.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomMySqlAntlrDdlParserListener.java @@ -47,7 +47,6 @@ import java.util.Collection; import java.util.LinkedList; import java.util.List; -import java.util.Properties; import java.util.concurrent.CopyOnWriteArrayList; /** @@ -77,14 +76,13 @@ public class CustomMySqlAntlrDdlParserListener extends MySqlParserBaseListener public CustomMySqlAntlrDdlParserListener( MySqlAntlrDdlParser parser, LinkedList parsedEvents, - Properties jdbcProperties) { + boolean tinyInt1isBit) { // initialize listeners listeners.add(new CreateAndAlterDatabaseParserListener(parser)); listeners.add(new DropDatabaseParserListener(parser)); listeners.add(new CreateTableParserListener(parser, listeners)); listeners.add( - new CustomAlterTableParserListener( - parser, listeners, parsedEvents, jdbcProperties)); + new CustomAlterTableParserListener(parser, listeners, parsedEvents, tinyInt1isBit)); listeners.add(new DropTableParserListener(parser)); listeners.add(new RenameTableParserListener(parser)); listeners.add(new TruncateTableParserListener(parser)); diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java index 8602ae6d5ac..63232bc9d75 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java @@ -33,6 +33,7 @@ import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema; import org.apache.flink.connector.base.source.reader.RecordEmitter; +import com.mysql.cj.conf.PropertyKey; import io.debezium.connector.mysql.antlr.MySqlAntlrDdlParser; import io.debezium.jdbc.JdbcConnection; import io.debezium.relational.Column; @@ -201,8 +202,12 @@ private Schema parseDDL(String ddlStatement, TableId tableId) { Column column = columns.get(i); String colName = column.name(); - DataType dataType = - MySqlTypeUtils.fromDbzColumn(column, sourceConfig.getJdbcProperties()); + boolean tinyInt1isBit = + Boolean.parseBoolean( + sourceConfig + .getJdbcProperties() + .getProperty(PropertyKey.tinyInt1isBit.getKeyName(), "true")); + DataType dataType = MySqlTypeUtils.fromDbzColumn(column, tinyInt1isBit); if (!column.isOptional()) { dataType = dataType.notNull(); } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/utils/MySqlSchemaUtils.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/utils/MySqlSchemaUtils.java index 7cdfdaa433a..1aa80c0bea2 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/utils/MySqlSchemaUtils.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/utils/MySqlSchemaUtils.java @@ -23,6 +23,7 @@ import org.apache.flink.cdc.connectors.mysql.schema.MySqlSchema; import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig; +import com.mysql.cj.conf.PropertyKey; import io.debezium.connector.mysql.MySqlConnection; import io.debezium.connector.mysql.MySqlPartition; import io.debezium.jdbc.JdbcConnection; @@ -37,7 +38,6 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.Properties; import java.util.stream.Collectors; import static org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils.createMySqlConnection; @@ -130,14 +130,19 @@ public static Schema getTableSchema( new MySqlSchema(sourceConfig, jdbc.isTableIdCaseSensitive())) { TableChanges.TableChange tableSchema = mySqlSchema.getTableSchema(partition, jdbc, toDbzTableId(tableId)); - return toSchema(tableSchema.getTable(), sourceConfig.getJdbcProperties()); + boolean tinyInt1isBit = + Boolean.parseBoolean( + sourceConfig + .getJdbcProperties() + .getProperty(PropertyKey.tinyInt1isBit.getKeyName(), "true")); + return toSchema(tableSchema.getTable(), tinyInt1isBit); } } - public static Schema toSchema(Table table, Properties jdbcProperties) { + public static Schema toSchema(Table table, boolean tinyInt1isBit) { List columns = table.columns().stream() - .map(column -> toColumn(column, jdbcProperties)) + .map(column -> toColumn(column, tinyInt1isBit)) .collect(Collectors.toList()); return Schema.newBuilder() @@ -147,10 +152,10 @@ public static Schema toSchema(Table table, Properties jdbcProperties) { .build(); } - public static Column toColumn(io.debezium.relational.Column column, Properties jdbcProperties) { + public static Column toColumn(io.debezium.relational.Column column, boolean tinyInt1isBit) { return Column.physicalColumn( column.name(), - MySqlTypeUtils.fromDbzColumn(column, jdbcProperties), + MySqlTypeUtils.fromDbzColumn(column, tinyInt1isBit), column.comment()); } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/utils/MySqlTypeUtils.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/utils/MySqlTypeUtils.java index cee1b404dee..05a23f7cc85 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/utils/MySqlTypeUtils.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/utils/MySqlTypeUtils.java @@ -22,11 +22,8 @@ import org.apache.flink.cdc.common.types.DataType; import org.apache.flink.cdc.common.types.DataTypes; -import com.mysql.cj.conf.PropertyKey; import io.debezium.relational.Column; -import java.util.Properties; - /** Utilities for converting from MySQL types to {@link DataType}s. */ public class MySqlTypeUtils { @@ -113,8 +110,8 @@ public class MySqlTypeUtils { private static final int FLOAT_LENGTH_UNSPECIFIED_FLAG = -1; /** Returns a corresponding Flink data type from a debezium {@link Column}. */ - public static DataType fromDbzColumn(Column column, Properties jdbcProperties) { - DataType dataType = convertFromColumn(column, jdbcProperties); + public static DataType fromDbzColumn(Column column, boolean tinyInt1isBit) { + DataType dataType = convertFromColumn(column, tinyInt1isBit); if (column.isOptional()) { return dataType; } else { @@ -126,7 +123,7 @@ public static DataType fromDbzColumn(Column column, Properties jdbcProperties) { * Returns a corresponding Flink data type from a debezium {@link Column} with nullable always * be true. */ - private static DataType convertFromColumn(Column column, Properties jdbcProperties) { + private static DataType convertFromColumn(Column column, boolean tinyInt1isBit) { String typeName = column.typeName(); switch (typeName) { case BIT: @@ -141,10 +138,6 @@ private static DataType convertFromColumn(Column column, Properties jdbcProperti // user should not use tinyint(1) to store number although jdbc url parameter // tinyInt1isBit=false can help change the return value, it's not a general way // btw: mybatis and mysql-connector-java map tinyint(1) to boolean by default - boolean tinyInt1isBit = - Boolean.parseBoolean( - jdbcProperties.getProperty( - PropertyKey.tinyInt1isBit.getKeyName(), "true")); return (column.length() == 1 && tinyInt1isBit) ? DataTypes.BOOLEAN() : DataTypes.TINYINT(); diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlMetadataAccessorITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlMetadataAccessorITCase.java index cb0a771e66c..45b998bbc77 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlMetadataAccessorITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlMetadataAccessorITCase.java @@ -104,22 +104,22 @@ public void testMysql8AccessDatabaseAndTable() { } @Test - public void testMysql57AccessCommonTypesSchemaTinyintIsBit() { + public void testMysql57AccessCommonTypesSchemaTinyInt1isBit() { testAccessCommonTypesSchema(fullTypesMySql57Database, true); } @Test - public void testMysql57AccessCommonTypesSchemaTinyintIsNotBit() { + public void testMysql57AccessCommonTypesSchemaTinyInt1isNotBit() { testAccessCommonTypesSchema(fullTypesMySql57Database, false); } @Test - public void testMysql8AccessCommonTypesSchemaTinyintIsBit() { + public void testMysql8AccessCommonTypesSchemaTinyInt1isBit() { testAccessCommonTypesSchema(fullTypesMySql8Database, true); } @Test - public void testMysql8AccessCommonTypesSchemaTinyintIsNotBit() { + public void testMysql8AccessCommonTypesSchemaTinyInt1isNotBit() { testAccessCommonTypesSchema(fullTypesMySql8Database, false); } @@ -225,7 +225,7 @@ public void testMysql57PrecisionTypesSchema() { String[] tables = new String[] {"precision_types"}; MySqlMetadataAccessor metadataAccessor = - getMetadataAccessor(tables, fullTypesMySql57Database); + getMetadataAccessor(tables, fullTypesMySql57Database, true); Schema actualSchema = metadataAccessor.getTableSchema( @@ -300,7 +300,7 @@ public void testMysql8PrecisionTypesSchema() { String[] tables = new String[] {"precision_types"}; MySqlMetadataAccessor metadataAccessor = - getMetadataAccessor(tables, fullTypesMySql8Database); + getMetadataAccessor(tables, fullTypesMySql8Database, false); Schema actualSchema = metadataAccessor.getTableSchema( diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java index 03e53a9f6b0..e0f1fa76e9f 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java @@ -276,6 +276,7 @@ public void testParseAlterStatementTinyint1IsNotBit() throws Exception { testParseAlterStatement(false); } + @Test public void testInitialStartupModeWithOpTs() throws Exception { inventoryDatabase.createAndInitialize(); Configuration sourceConfiguration = new Configuration(); @@ -437,12 +438,12 @@ public void testInitialStartupModeWithOpTs() throws Exception { } } - public void testParseAlterStatement(boolean tinyint1IsBit) throws Exception { + public void testParseAlterStatement(boolean tinyInt1isBit) throws Exception { env.setParallelism(1); inventoryDatabase.createAndInitialize(); Properties jdbcProperties = new Properties(); - jdbcProperties.put(PropertyKey.tinyInt1isBit.getKeyName(), String.valueOf(tinyint1IsBit)); + jdbcProperties.put(PropertyKey.tinyInt1isBit.getKeyName(), String.valueOf(tinyInt1isBit)); MySqlSourceConfigFactory configFactory = new MySqlSourceConfigFactory() @@ -567,7 +568,7 @@ public void testParseAlterStatement(boolean tinyint1IsBit) throws Exception { new AddColumnEvent.ColumnWithPosition( Column.physicalColumn( "cols10", - tinyint1IsBit + tinyInt1isBit ? DataTypes.BOOLEAN() : DataTypes.TINYINT()))))); @@ -592,7 +593,7 @@ public void testParseAlterStatement(boolean tinyint1IsBit) throws Exception { } @Test - public void testSchemaChangeEventsTinyint1IsBit() throws Exception { + public void testSchemaChangeEventstinyInt1isBit() throws Exception { testSchemaChangeEvents(true); } @@ -601,12 +602,12 @@ public void testSchemaChangeEventsTinyint1IsNotBit() throws Exception { testSchemaChangeEvents(false); } - public void testSchemaChangeEvents(boolean tinyint1IsBit) throws Exception { + public void testSchemaChangeEvents(boolean tinyInt1isBit) throws Exception { env.setParallelism(1); inventoryDatabase.createAndInitialize(); Properties jdbcProperties = new Properties(); - jdbcProperties.put(PropertyKey.tinyInt1isBit.getKeyName(), String.valueOf(tinyint1IsBit)); + jdbcProperties.put(PropertyKey.tinyInt1isBit.getKeyName(), String.valueOf(tinyInt1isBit)); MySqlSourceConfigFactory configFactory = new MySqlSourceConfigFactory() @@ -663,7 +664,7 @@ public void testSchemaChangeEvents(boolean tinyint1IsBit) throws Exception { new AddColumnEvent.ColumnWithPosition( Column.physicalColumn( "new_tinyint1_col1", - tinyint1IsBit + tinyInt1isBit ? DataTypes.BOOLEAN() : DataTypes.TINYINT()))))); @@ -855,7 +856,7 @@ public void testSchemaChangeEvents(boolean tinyint1IsBit) throws Exception { .physicalColumn("bit3_c", DataTypes.BINARY(1)) .physicalColumn( "tiny1_c", - tinyint1IsBit + tinyInt1isBit ? DataTypes.BOOLEAN() : DataTypes.TINYINT()) .physicalColumn("boolean_c", DataTypes.BOOLEAN()) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java index bbc8dce30db..64d5c62d608 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java @@ -36,6 +36,7 @@ import com.github.shyiko.mysql.binlog.event.Event; import com.github.shyiko.mysql.binlog.event.EventType; +import com.mysql.cj.conf.PropertyKey; import io.debezium.connector.base.ChangeEventQueue; import io.debezium.connector.mysql.MySqlStreamingChangeEventSourceMetrics; import io.debezium.pipeline.DataChangeEvent; @@ -223,11 +224,18 @@ private boolean shouldEmit(SourceRecord sourceRecord) { // only the table who captured snapshot splits need to filter if (finishedSplitsInfo.containsKey(tableId)) { + boolean tinyInt1isBit = + Boolean.parseBoolean( + statefulTaskContext + .getSourceConfig() + .getJdbcProperties() + .getProperty( + PropertyKey.tinyInt1isBit.getKeyName(), "true")); RowType splitKeyType = ChunkUtils.getChunkKeyColumnType( statefulTaskContext.getDatabaseSchema().tableFor(tableId), statefulTaskContext.getSourceConfig().getChunkKeyColumns(), - statefulTaskContext.getSourceConfig().getJdbcProperties()); + tinyInt1isBit); Struct target = RecordUtils.getStructContainsChunkKey(sourceRecord); Object[] chunkKey = diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/schema/MySqlTypeUtils.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/schema/MySqlTypeUtils.java index e14bfc988ca..bd770081832 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/schema/MySqlTypeUtils.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/schema/MySqlTypeUtils.java @@ -20,11 +20,8 @@ import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.types.DataType; -import com.mysql.cj.conf.PropertyKey; import io.debezium.relational.Column; -import java.util.Properties; - /** Utilities for converting from MySQL types to Flink types. */ public class MySqlTypeUtils { @@ -110,8 +107,8 @@ public class MySqlTypeUtils { private static final String UNKNOWN = "UNKNOWN"; /** Returns a corresponding Flink data type from a debezium {@link Column}. */ - public static DataType fromDbzColumn(Column column, Properties jdbcProperties) { - DataType dataType = convertFromColumn(column, jdbcProperties); + public static DataType fromDbzColumn(Column column, boolean tinyInt1isBit) { + DataType dataType = convertFromColumn(column, tinyInt1isBit); if (column.isOptional()) { return dataType; } else { @@ -123,7 +120,7 @@ public static DataType fromDbzColumn(Column column, Properties jdbcProperties) { * Returns a corresponding Flink data type from a debezium {@link Column} with nullable always * be true. */ - private static DataType convertFromColumn(Column column, Properties jdbcProperties) { + private static DataType convertFromColumn(Column column, boolean tinyInt1isBit) { String typeName = column.typeName(); switch (typeName) { case BIT: @@ -138,10 +135,6 @@ private static DataType convertFromColumn(Column column, Properties jdbcProperti // user should not use tinyint(1) to store number although jdbc url parameter // tinyInt1isBit=false can help change the return value, it's not a general way // btw: mybatis and mysql-connector-java map tinyint(1) to boolean by default - boolean tinyInt1isBit = - Boolean.parseBoolean( - jdbcProperties.getProperty( - PropertyKey.tinyInt1isBit.getKeyName(), "true")); return (column.length() == 1 && tinyInt1isBit) ? DataTypes.BOOLEAN() : DataTypes.TINYINT(); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitter.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitter.java index 73f21640cb5..d65c26c634e 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitter.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitter.java @@ -32,6 +32,7 @@ import org.apache.flink.table.types.logical.RowType; import org.apache.flink.util.Preconditions; +import com.mysql.cj.conf.PropertyKey; import io.debezium.connector.mysql.MySqlPartition; import io.debezium.jdbc.JdbcConnection; import io.debezium.relational.Column; @@ -52,7 +53,6 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; -import java.util.Properties; import static java.math.BigDecimal.ROUND_CEILING; @@ -149,8 +149,12 @@ private void analyzeTable(MySqlPartition partition, TableId tableId) { splitColumn = ChunkUtils.getChunkKeyColumn( currentSplittingTable, sourceConfig.getChunkKeyColumns()); - splitType = - ChunkUtils.getChunkKeyColumnType(splitColumn, sourceConfig.getJdbcProperties()); + boolean tinyInt1isBit = + Boolean.parseBoolean( + sourceConfig + .getJdbcProperties() + .getProperty(PropertyKey.tinyInt1isBit.getKeyName(), "true")); + splitType = ChunkUtils.getChunkKeyColumnType(splitColumn, tinyInt1isBit); minMaxOfSplitColumn = StatementUtils.queryMinMax(jdbcConnection, tableId, splitColumn.name()); approximateRowCnt = StatementUtils.queryApproximateRowCnt(jdbcConnection, tableId); @@ -387,7 +391,12 @@ private int getDynamicChunkSize( Object max, int chunkSize, long approximateRowCnt) { - if (!isEvenlySplitColumn(splitColumn, sourceConfig.getJdbcProperties())) { + boolean tinyInt1isBit = + Boolean.parseBoolean( + sourceConfig + .getJdbcProperties() + .getProperty(PropertyKey.tinyInt1isBit.getKeyName(), "true")); + if (!isEvenlySplitColumn(splitColumn, tinyInt1isBit)) { return -1; } final double distributionFactorUpper = sourceConfig.getDistributionFactorUpper(); @@ -412,8 +421,8 @@ private int getDynamicChunkSize( } /** Checks whether split column is evenly distributed across its range. */ - private static boolean isEvenlySplitColumn(Column splitColumn, Properties jdbcProperties) { - DataType flinkType = MySqlTypeUtils.fromDbzColumn(splitColumn, jdbcProperties); + private static boolean isEvenlySplitColumn(Column splitColumn, boolean tinyInt1isBit) { + DataType flinkType = MySqlTypeUtils.fromDbzColumn(splitColumn, tinyInt1isBit); LogicalTypeRoot typeRoot = flinkType.getLogicalType().getTypeRoot(); // currently, we only support the optimization that split column with type BIGINT, INT, diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/ChunkUtils.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/ChunkUtils.java index a58f5139488..405fd1f9638 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/ChunkUtils.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/ChunkUtils.java @@ -34,7 +34,6 @@ import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.Properties; import java.util.stream.Collectors; import static org.apache.flink.table.api.DataTypes.FIELD; @@ -46,15 +45,15 @@ public class ChunkUtils { private ChunkUtils() {} public static RowType getChunkKeyColumnType( - Table table, Map chunkKeyColumns, Properties jdbcProperties) { - return getChunkKeyColumnType(getChunkKeyColumn(table, chunkKeyColumns), jdbcProperties); + Table table, Map chunkKeyColumns, boolean tinyInt1isBit) { + return getChunkKeyColumnType(getChunkKeyColumn(table, chunkKeyColumns), tinyInt1isBit); } - public static RowType getChunkKeyColumnType(Column chunkKeyColumn, Properties jdbcProperties) { + public static RowType getChunkKeyColumnType(Column chunkKeyColumn, boolean tinyInt1isBit) { return (RowType) ROW(FIELD( chunkKeyColumn.name(), - MySqlTypeUtils.fromDbzColumn(chunkKeyColumn, jdbcProperties))) + MySqlTypeUtils.fromDbzColumn(chunkKeyColumn, tinyInt1isBit))) .getLogicalType(); } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java index 47b7f93143d..c2791649fd9 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java @@ -32,7 +32,6 @@ import org.apache.flink.table.types.logical.RowType; import org.apache.flink.util.ExceptionUtils; -import com.mysql.cj.conf.PropertyKey; import io.debezium.relational.Column; import io.debezium.relational.TableId; import org.junit.BeforeClass; @@ -46,7 +45,6 @@ import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.Properties; import java.util.stream.Collectors; import static org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND; @@ -576,12 +574,9 @@ private List getTestAssignSnapshotSplitsFromCheckpoint(AssignerStatus as List alreadyProcessedTables = new ArrayList<>(); alreadyProcessedTables.add(processedTable); - Properties jdbcProperties = new Properties(); - jdbcProperties.put(PropertyKey.tinyInt1isBit.getKeyName(), "true"); RowType splitKeyType = ChunkUtils.getChunkKeyColumnType( - Column.editor().name("id").type("INT").jdbcType(4).create(), - jdbcProperties); + Column.editor().name("id").type("INT").jdbcType(4).create(), true); List remainingSplits = Arrays.asList( new MySqlSchemalessSnapshotSplit( From a61f15f41f726b518f2c6f0c4b21d2312af0535d Mon Sep 17 00:00:00 2001 From: "north.lin" <37775475+qg-lin@users.noreply.github.com> Date: Mon, 13 Jan 2025 19:46:26 +0800 Subject: [PATCH 5/9] uodate FAQ --- docs/content.zh/docs/faq/faq.md | 13 +++++++++++++ docs/content/docs/faq/faq.md | 13 +++++++++++++ 2 files changed, 26 insertions(+) diff --git a/docs/content.zh/docs/faq/faq.md b/docs/content.zh/docs/faq/faq.md index 54b55fe9931..0a613f61c78 100644 --- a/docs/content.zh/docs/faq/faq.md +++ b/docs/content.zh/docs/faq/faq.md @@ -207,6 +207,19 @@ restart-strategy.fixed-delay.delay= 30s 1. tableList选项要求表名使用数据库名,而不是DataStream API中的表名。对于MySQL CDC源代码,tableList选项值应该类似于‘my_db.my_table’。 2. 如果要同步排除products和orders表之外的整个my_db库,tableList选项值应该类似于‘my_db.(?!products|orders).*’。 +### Q16: MySQL源表中存在TINYINT(1)类型的列,且部分行的数值>1,Pipeline作业下游接收到的数据却是true/false,为什么? +这是由于MySQL连接参数`tinyInt1isBit`默认值为`true`,导致TINYINT(1)类型的数据被解析为布尔值。 +若需将其转换为实际值,请在source节点添加配置`jdbc.properties.tinyInt1isBit: false`。 +例如: +```yaml +source: + type: mysql + ... + jdbc.properties.tinyInt1isBit: false + +sink: + type: ... +``` ## Postgres CDC FAQ ### Q1: 发现 PG 服务器磁盘使用率高,WAL 不释放 是什么原因? diff --git a/docs/content/docs/faq/faq.md b/docs/content/docs/faq/faq.md index 2b57167d5ab..4042378c4d7 100644 --- a/docs/content/docs/faq/faq.md +++ b/docs/content/docs/faq/faq.md @@ -210,6 +210,19 @@ The reason for this problem is that the reading of the full volume phase of the 1. The `tableList` option requires table name with database name rather than table name in DataStream API. For MySQL CDC source, the `tableList` option value should like ‘my_db.my_table’. 2. If you need to synchronize the whole mydb database excluding the products and orders tables, the `tableList` option value should like 'my_db.(?!products|orders).*'. +### Q16: In MySQL source table, there is a TINYINT(1) column where some rows contain values greater than 1. However, downstreams receive this data as true/false in the pipeline job. Why does this happen? +This is because the default value of the MySQL connection parameter `tinyInt1isBit` is true, which causes the TINYINT(1) data to be interpreted as boolean values. +To convert it to actual values, please add the configuration `jdbc.properties.tinyInt1isBit: false` at the source node. +For example: +```yaml +source: + type: mysql + ... + jdbc.properties.tinyInt1isBit: false + +sink: + type: ... +``` ## Postgres CDC FAQ ### Q1: It is found that the disk utilization rate of PG server is high. What is the reason why wal is not released? From ef5983a22cbb65281d085856a971f06209d87f1e Mon Sep 17 00:00:00 2001 From: "north.lin" <37775475+qg-lin@users.noreply.github.com> Date: Tue, 14 Jan 2025 13:53:27 +0800 Subject: [PATCH 6/9] add a method to get tinyInt1isBit --- .../cdc/connectors/mysql/source/MySqlDataSource.java | 9 +-------- .../source/reader/MySqlPipelineRecordEmitter.java | 9 ++------- .../cdc/connectors/mysql/utils/MySqlSchemaUtils.java | 8 +------- .../mysql/debezium/reader/BinlogSplitReader.java | 10 +--------- .../mysql/source/assigners/MySqlChunkSplitter.java | 8 ++------ .../mysql/source/config/MySqlSourceConfig.java | 6 ++++++ 6 files changed, 13 insertions(+), 37 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSource.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSource.java index ec87e71ece5..0fd1efc1933 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSource.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSource.java @@ -31,8 +31,6 @@ import org.apache.flink.cdc.connectors.mysql.table.MySqlReadableMetadata; import org.apache.flink.cdc.debezium.table.DebeziumChangelogMode; -import com.mysql.cj.conf.PropertyKey; - import java.util.ArrayList; import java.util.List; @@ -59,17 +57,12 @@ public MySqlDataSource( @Override public EventSourceProvider getEventSourceProvider() { - boolean tinyInt1isBit = - Boolean.parseBoolean( - sourceConfig - .getJdbcProperties() - .getProperty(PropertyKey.tinyInt1isBit.getKeyName(), "true")); MySqlEventDeserializer deserializer = new MySqlEventDeserializer( DebeziumChangelogMode.ALL, sourceConfig.isIncludeSchemaChanges(), readableMetadataList, - tinyInt1isBit); + sourceConfig.getTinyInt1isBit()); MySqlSource source = new MySqlSource<>( diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java index 63232bc9d75..07a45ebf5c3 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java @@ -33,7 +33,6 @@ import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema; import org.apache.flink.connector.base.source.reader.RecordEmitter; -import com.mysql.cj.conf.PropertyKey; import io.debezium.connector.mysql.antlr.MySqlAntlrDdlParser; import io.debezium.jdbc.JdbcConnection; import io.debezium.relational.Column; @@ -202,12 +201,8 @@ private Schema parseDDL(String ddlStatement, TableId tableId) { Column column = columns.get(i); String colName = column.name(); - boolean tinyInt1isBit = - Boolean.parseBoolean( - sourceConfig - .getJdbcProperties() - .getProperty(PropertyKey.tinyInt1isBit.getKeyName(), "true")); - DataType dataType = MySqlTypeUtils.fromDbzColumn(column, tinyInt1isBit); + DataType dataType = + MySqlTypeUtils.fromDbzColumn(column, sourceConfig.getTinyInt1isBit()); if (!column.isOptional()) { dataType = dataType.notNull(); } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/utils/MySqlSchemaUtils.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/utils/MySqlSchemaUtils.java index 1aa80c0bea2..08b2179c5f7 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/utils/MySqlSchemaUtils.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/utils/MySqlSchemaUtils.java @@ -23,7 +23,6 @@ import org.apache.flink.cdc.connectors.mysql.schema.MySqlSchema; import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig; -import com.mysql.cj.conf.PropertyKey; import io.debezium.connector.mysql.MySqlConnection; import io.debezium.connector.mysql.MySqlPartition; import io.debezium.jdbc.JdbcConnection; @@ -130,12 +129,7 @@ public static Schema getTableSchema( new MySqlSchema(sourceConfig, jdbc.isTableIdCaseSensitive())) { TableChanges.TableChange tableSchema = mySqlSchema.getTableSchema(partition, jdbc, toDbzTableId(tableId)); - boolean tinyInt1isBit = - Boolean.parseBoolean( - sourceConfig - .getJdbcProperties() - .getProperty(PropertyKey.tinyInt1isBit.getKeyName(), "true")); - return toSchema(tableSchema.getTable(), tinyInt1isBit); + return toSchema(tableSchema.getTable(), sourceConfig.getTinyInt1isBit()); } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java index 64d5c62d608..871831d75e6 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java @@ -36,7 +36,6 @@ import com.github.shyiko.mysql.binlog.event.Event; import com.github.shyiko.mysql.binlog.event.EventType; -import com.mysql.cj.conf.PropertyKey; import io.debezium.connector.base.ChangeEventQueue; import io.debezium.connector.mysql.MySqlStreamingChangeEventSourceMetrics; import io.debezium.pipeline.DataChangeEvent; @@ -224,18 +223,11 @@ private boolean shouldEmit(SourceRecord sourceRecord) { // only the table who captured snapshot splits need to filter if (finishedSplitsInfo.containsKey(tableId)) { - boolean tinyInt1isBit = - Boolean.parseBoolean( - statefulTaskContext - .getSourceConfig() - .getJdbcProperties() - .getProperty( - PropertyKey.tinyInt1isBit.getKeyName(), "true")); RowType splitKeyType = ChunkUtils.getChunkKeyColumnType( statefulTaskContext.getDatabaseSchema().tableFor(tableId), statefulTaskContext.getSourceConfig().getChunkKeyColumns(), - tinyInt1isBit); + statefulTaskContext.getSourceConfig().getTinyInt1isBit()); Struct target = RecordUtils.getStructContainsChunkKey(sourceRecord); Object[] chunkKey = diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitter.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitter.java index d65c26c634e..c7bd4e5ea0d 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitter.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitter.java @@ -149,12 +149,8 @@ private void analyzeTable(MySqlPartition partition, TableId tableId) { splitColumn = ChunkUtils.getChunkKeyColumn( currentSplittingTable, sourceConfig.getChunkKeyColumns()); - boolean tinyInt1isBit = - Boolean.parseBoolean( - sourceConfig - .getJdbcProperties() - .getProperty(PropertyKey.tinyInt1isBit.getKeyName(), "true")); - splitType = ChunkUtils.getChunkKeyColumnType(splitColumn, tinyInt1isBit); + splitType = + ChunkUtils.getChunkKeyColumnType(splitColumn, sourceConfig.getTinyInt1isBit()); minMaxOfSplitColumn = StatementUtils.queryMinMax(jdbcConnection, tableId, splitColumn.name()); approximateRowCnt = StatementUtils.queryApproximateRowCnt(jdbcConnection, tableId); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java index dd0ac789666..ea4144a07a1 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java @@ -22,6 +22,7 @@ import org.apache.flink.cdc.connectors.mysql.table.StartupOptions; import org.apache.flink.table.catalog.ObjectPath; +import com.mysql.cj.conf.PropertyKey; import io.debezium.config.Configuration; import io.debezium.connector.mysql.MySqlConnectorConfig; import io.debezium.relational.RelationalTableFilters; @@ -254,4 +255,9 @@ public Map getChunkKeyColumns() { public boolean isSkipSnapshotBackfill() { return skipSnapshotBackfill; } + + public boolean getTinyInt1isBit() { + return Boolean.parseBoolean( + jdbcProperties.getProperty(PropertyKey.tinyInt1isBit.getKeyName(), "true")); + } } From c4aa621046d203668207ef57ff3dd2c21e2f29be Mon Sep 17 00:00:00 2001 From: "north.lin" <37775475+qg-lin@users.noreply.github.com> Date: Thu, 16 Jan 2025 12:42:32 +0800 Subject: [PATCH 7/9] add new cdc config `treat-tinyint1-as-boolean` --- .../connectors/pipeline-connectors/mysql.md | 7 ++++++ docs/content.zh/docs/faq/faq.md | 6 ++--- .../connectors/pipeline-connectors/mysql.md | 7 ++++++ docs/content/docs/faq/faq.md | 6 ++--- .../mysql/factory/MySqlDataSourceFactory.java | 11 +++++++- .../mysql/source/MySqlDataSource.java | 2 +- .../mysql/source/MySqlDataSourceOptions.java | 7 ++++++ .../mysql/source/MySqlEventDeserializer.java | 11 ++++++-- .../reader/MySqlPipelineRecordEmitter.java | 2 +- .../mysql/utils/MySqlSchemaUtils.java | 2 +- .../source/MySqlMetadataAccessorITCase.java | 7 +----- .../mysql/source/MySqlPipelineITCase.java | 25 +++++++++++-------- .../debezium/reader/BinlogSplitReader.java | 2 +- .../source/assigners/MySqlChunkSplitter.java | 3 ++- .../source/config/MySqlSourceConfig.java | 11 ++++---- .../config/MySqlSourceConfigFactory.java | 9 ++++++- 16 files changed, 82 insertions(+), 36 deletions(-) diff --git a/docs/content.zh/docs/connectors/pipeline-connectors/mysql.md b/docs/content.zh/docs/connectors/pipeline-connectors/mysql.md index 0de46fd49db..e898c5c2540 100644 --- a/docs/content.zh/docs/connectors/pipeline-connectors/mysql.md +++ b/docs/content.zh/docs/connectors/pipeline-connectors/mysql.md @@ -305,6 +305,13 @@ pipeline: Boolean 是否启用同步表、字段注释特性,默认关闭。注意:开启此特性将会对内存使用产生影响。 + + treat-tinyint1-as-boolean + optional + true + Boolean + 是否将TINYINT(1)类型当做Boolean类型处理,默认true。 + diff --git a/docs/content.zh/docs/faq/faq.md b/docs/content.zh/docs/faq/faq.md index 0a613f61c78..4aea2e067ea 100644 --- a/docs/content.zh/docs/faq/faq.md +++ b/docs/content.zh/docs/faq/faq.md @@ -208,14 +208,14 @@ restart-strategy.fixed-delay.delay= 30s 2. 如果要同步排除products和orders表之外的整个my_db库,tableList选项值应该类似于‘my_db.(?!products|orders).*’。 ### Q16: MySQL源表中存在TINYINT(1)类型的列,且部分行的数值>1,Pipeline作业下游接收到的数据却是true/false,为什么? -这是由于MySQL连接参数`tinyInt1isBit`默认值为`true`,导致TINYINT(1)类型的数据被解析为布尔值。 -若需将其转换为实际值,请在source节点添加配置`jdbc.properties.tinyInt1isBit: false`。 +这是由于MySQL连接参数`tinyInt1isBit`默认值为`true`,Flink CDC 3.3.0之前的版本未处理该参数,导致TINYINT(1)类型的数据被解析为布尔值。 +若需将其转换为实际值,请将CDC升级至3.3.0+,并在source节点添加配置`treat-tinyint1-as-boolean: false`。 例如: ```yaml source: type: mysql ... - jdbc.properties.tinyInt1isBit: false + treat-tinyint1-as-boolean: false sink: type: ... diff --git a/docs/content/docs/connectors/pipeline-connectors/mysql.md b/docs/content/docs/connectors/pipeline-connectors/mysql.md index df0faec4d40..621b94440f4 100644 --- a/docs/content/docs/connectors/pipeline-connectors/mysql.md +++ b/docs/content/docs/connectors/pipeline-connectors/mysql.md @@ -313,6 +313,13 @@ pipeline: Whether enable include table and column comments, by default is false, if set to true, the table and column comments will be sent.
Note: Enable this option will bring the implications on memory usage. + + treat-tinyint1-as-boolean + optional + true + Boolean + Whether treat TINYINT(1) as boolean, by default is true. + diff --git a/docs/content/docs/faq/faq.md b/docs/content/docs/faq/faq.md index 4042378c4d7..ab5b2d064cc 100644 --- a/docs/content/docs/faq/faq.md +++ b/docs/content/docs/faq/faq.md @@ -211,14 +211,14 @@ The reason for this problem is that the reading of the full volume phase of the 2. If you need to synchronize the whole mydb database excluding the products and orders tables, the `tableList` option value should like 'my_db.(?!products|orders).*'. ### Q16: In MySQL source table, there is a TINYINT(1) column where some rows contain values greater than 1. However, downstreams receive this data as true/false in the pipeline job. Why does this happen? -This is because the default value of the MySQL connection parameter `tinyInt1isBit` is true, which causes the TINYINT(1) data to be interpreted as boolean values. -To convert it to actual values, please add the configuration `jdbc.properties.tinyInt1isBit: false` at the source node. +This is because the default value of the MySQL connection parameter `tinyInt1isBit` is true and the version of Flink CDC before 3.3.0 didn't convert it, which causes the TINYINT(1) data to be interpreted as boolean values. +To convert it to actual values, please upgrade your CDC version to 3.3.0+ then add the configuration `treat-tinyint1-as-boolean: false` at the source node. For example: ```yaml source: type: mysql ... - jdbc.properties.tinyInt1isBit: false + treat-tinyint1-as-boolean: false sink: type: ... diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java index 41e60890e22..526a547779b 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java @@ -41,6 +41,7 @@ import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.catalog.ObjectPath; +import com.mysql.cj.conf.PropertyKey; import io.debezium.relational.RelationalDatabaseConnectorConfig; import io.debezium.relational.Tables; import org.slf4j.Logger; @@ -91,6 +92,7 @@ import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SERVER_TIME_ZONE; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TABLES; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TABLES_EXCLUDE; +import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TREAT_TINYINT1_AS_BOOLEAN; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.USERNAME; import static org.apache.flink.cdc.connectors.mysql.source.utils.ObjectUtils.doubleCompare; import static org.apache.flink.cdc.debezium.table.DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX; @@ -136,6 +138,7 @@ public DataSource createDataSource(Context context) { boolean closeIdleReaders = config.get(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED); boolean includeComments = config.get(INCLUDE_COMMENTS_ENABLED); + boolean treatTinyInt1AsBoolean = config.get(TREAT_TINYINT1_AS_BOOLEAN); Duration heartbeatInterval = config.get(HEARTBEAT_INTERVAL); Duration connectTimeout = config.get(CONNECT_TIMEOUT); @@ -164,6 +167,11 @@ public DataSource createDataSource(Context context) { "true"); } + if (!treatTinyInt1AsBoolean) { + // set jdbc config 'tinyInt1isBit' to false + configMap.put(PROPERTIES_PREFIX + PropertyKey.tinyInt1isBit.getKeyName(), "false"); + } + MySqlSourceConfigFactory configFactory = new MySqlSourceConfigFactory() .hostname(hostname) @@ -189,7 +197,8 @@ public DataSource createDataSource(Context context) { .debeziumProperties(getDebeziumProperties(configMap)) .jdbcProperties(getJdbcProperties(configMap)) .scanNewlyAddedTableEnabled(scanNewlyAddedTableEnabled) - .parseOnLineSchemaChanges(isParsingOnLineSchemaChanges); + .parseOnLineSchemaChanges(isParsingOnLineSchemaChanges) + .treatTinyInt1AsBoolean(treatTinyInt1AsBoolean); List tableIds = MySqlSchemaUtils.listTables(configFactory.createConfig(0), null); diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSource.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSource.java index 55fd70b30a8..81912b4a847 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSource.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSource.java @@ -72,7 +72,7 @@ public EventSourceProvider getEventSourceProvider() { sourceConfig.isIncludeSchemaChanges(), readableMetadataList, includeComments, - sourceConfig.getTinyInt1isBit()); + sourceConfig.isTreatTinyInt1AsBoolean()); MySqlSource source = new MySqlSource<>( diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java index 3747212441c..22fbabe5ede 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java @@ -298,4 +298,11 @@ public class MySqlDataSourceOptions { .withDescription( "Whether enable include table and column comments, by default is false, if set to true, table and column comments will be sent. " + "Note: Enable this option will bring the implications on memory usage."); + + @Experimental + public static final ConfigOption TREAT_TINYINT1_AS_BOOLEAN = + ConfigOptions.key("treat-tinyint1-as-boolean") + .booleanType() + .defaultValue(true) + .withDescription("Whether treat TINYINT(1) as boolean, by default is true. "); } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlEventDeserializer.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlEventDeserializer.java index 15ad4b4b53b..7fee4ebfdd4 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlEventDeserializer.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlEventDeserializer.java @@ -71,8 +71,15 @@ public class MySqlEventDeserializer extends DebeziumEventDeserializationSchema { private List readableMetadataList; public MySqlEventDeserializer( - DebeziumChangelogMode changelogMode, boolean includeSchemaChanges, boolean tinyInt1isBit) { - this(changelogMode, includeSchemaChanges, new ArrayList<>(), false, true); + DebeziumChangelogMode changelogMode, + boolean includeSchemaChanges, + boolean tinyInt1isBit) { + this( + changelogMode, + includeSchemaChanges, + new ArrayList<>(), + includeSchemaChanges, + tinyInt1isBit); } public MySqlEventDeserializer( diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java index d72d726840e..143efa120b0 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java @@ -203,7 +203,7 @@ private Schema parseDDL(String ddlStatement, TableId tableId) { String colName = column.name(); DataType dataType = - MySqlTypeUtils.fromDbzColumn(column, sourceConfig.getTinyInt1isBit()); + MySqlTypeUtils.fromDbzColumn(column, sourceConfig.isTreatTinyInt1AsBoolean()); if (!column.isOptional()) { dataType = dataType.notNull(); } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/utils/MySqlSchemaUtils.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/utils/MySqlSchemaUtils.java index 08b2179c5f7..616e28b1db6 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/utils/MySqlSchemaUtils.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/utils/MySqlSchemaUtils.java @@ -129,7 +129,7 @@ public static Schema getTableSchema( new MySqlSchema(sourceConfig, jdbc.isTableIdCaseSensitive())) { TableChanges.TableChange tableSchema = mySqlSchema.getTableSchema(partition, jdbc, toDbzTableId(tableId)); - return toSchema(tableSchema.getTable(), sourceConfig.getTinyInt1isBit()); + return toSchema(tableSchema.getTable(), sourceConfig.isTreatTinyInt1AsBoolean()); } } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlMetadataAccessorITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlMetadataAccessorITCase.java index 45b998bbc77..96f3b5f92b3 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlMetadataAccessorITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlMetadataAccessorITCase.java @@ -32,7 +32,6 @@ import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import com.mysql.cj.conf.PropertyKey; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; @@ -42,7 +41,6 @@ import java.time.ZoneId; import java.util.Arrays; import java.util.List; -import java.util.Properties; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -537,9 +535,6 @@ private MySqlSourceConfig getConfig( .map(tableName -> database.getDatabaseName() + "." + tableName) .toArray(String[]::new); - Properties jdbcProperties = new Properties(); - jdbcProperties.put(PropertyKey.tinyInt1isBit.getKeyName(), String.valueOf(tinyint1IsBit)); - return new MySqlSourceConfigFactory() .startupOptions(StartupOptions.latest()) .databaseList(database.getDatabaseName()) @@ -552,7 +547,7 @@ private MySqlSourceConfig getConfig( .username(database.getUsername()) .password(database.getPassword()) .serverTimeZone(ZoneId.of("UTC").toString()) - .jdbcProperties(jdbcProperties) + .treatTinyInt1AsBoolean(tinyint1IsBit) .createConfig(0); } } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java index d52a4f7be5b..3e338a68ca6 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java @@ -55,7 +55,6 @@ import org.apache.flink.table.planner.factories.TestValuesTableFactory; import org.apache.flink.util.CloseableIterator; -import com.mysql.cj.conf.PropertyKey; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; @@ -73,7 +72,6 @@ import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Properties; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -449,9 +447,6 @@ public void testParseAlterStatement(boolean tinyInt1isBit) throws Exception { env.setParallelism(1); inventoryDatabase.createAndInitialize(); - Properties jdbcProperties = new Properties(); - jdbcProperties.put(PropertyKey.tinyInt1isBit.getKeyName(), String.valueOf(tinyInt1isBit)); - MySqlSourceConfigFactory configFactory = new MySqlSourceConfigFactory() .hostname(MYSQL8_CONTAINER.getHost()) @@ -463,7 +458,7 @@ public void testParseAlterStatement(boolean tinyInt1isBit) throws Exception { .startupOptions(StartupOptions.latest()) .serverId(getServerId(env.getParallelism())) .serverTimeZone("UTC") - .jdbcProperties(jdbcProperties) + .treatTinyInt1AsBoolean(tinyInt1isBit) .includeSchemaChanges(SCHEMA_CHANGE_ENABLED.defaultValue()); FlinkSourceProvider sourceProvider = @@ -613,9 +608,6 @@ public void testSchemaChangeEvents(boolean tinyInt1isBit) throws Exception { env.setParallelism(1); inventoryDatabase.createAndInitialize(); - Properties jdbcProperties = new Properties(); - jdbcProperties.put(PropertyKey.tinyInt1isBit.getKeyName(), String.valueOf(tinyInt1isBit)); - MySqlSourceConfigFactory configFactory = new MySqlSourceConfigFactory() .hostname(MYSQL8_CONTAINER.getHost()) @@ -627,7 +619,7 @@ public void testSchemaChangeEvents(boolean tinyInt1isBit) throws Exception { .startupOptions(StartupOptions.latest()) .serverId(getServerId(env.getParallelism())) .serverTimeZone("UTC") - .jdbcProperties(jdbcProperties) + .treatTinyInt1AsBoolean(tinyInt1isBit) .includeSchemaChanges(SCHEMA_CHANGE_ENABLED.defaultValue()); FlinkSourceProvider sourceProvider = @@ -675,6 +667,19 @@ public void testSchemaChangeEvents(boolean tinyInt1isBit) throws Exception { ? DataTypes.BOOLEAN() : DataTypes.TINYINT()))))); + // Add a new BOOLEAN column + statement.execute( + String.format( + "ALTER TABLE `%s`.`customers` ADD COLUMN `new_bool_col1` bool NULL;", + inventoryDatabase.getDatabaseName())); + expected.add( + new AddColumnEvent( + TableId.tableId(inventoryDatabase.getDatabaseName(), "customers"), + Collections.singletonList( + new AddColumnEvent.ColumnWithPosition( + Column.physicalColumn( + "new_bool_col1", DataTypes.BOOLEAN()))))); + // Test MODIFY COLUMN DDL statement.execute( String.format( diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java index f335fb73f74..dee96802cce 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java @@ -253,7 +253,7 @@ private boolean shouldEmit(SourceRecord sourceRecord) { ChunkUtils.getChunkKeyColumnType( statefulTaskContext.getDatabaseSchema().tableFor(tableId), statefulTaskContext.getSourceConfig().getChunkKeyColumns(), - statefulTaskContext.getSourceConfig().getTinyInt1isBit()); + statefulTaskContext.getSourceConfig().isTreatTinyInt1AsBoolean()); Struct target = RecordUtils.getStructContainsChunkKey(sourceRecord); Object[] chunkKey = diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitter.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitter.java index c7bd4e5ea0d..5c069ee3d58 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitter.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitter.java @@ -150,7 +150,8 @@ private void analyzeTable(MySqlPartition partition, TableId tableId) { ChunkUtils.getChunkKeyColumn( currentSplittingTable, sourceConfig.getChunkKeyColumns()); splitType = - ChunkUtils.getChunkKeyColumnType(splitColumn, sourceConfig.getTinyInt1isBit()); + ChunkUtils.getChunkKeyColumnType( + splitColumn, sourceConfig.isTreatTinyInt1AsBoolean()); minMaxOfSplitColumn = StatementUtils.queryMinMax(jdbcConnection, tableId, splitColumn.name()); approximateRowCnt = StatementUtils.queryApproximateRowCnt(jdbcConnection, tableId); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java index 8d1b86eea4b..56d51051615 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java @@ -22,7 +22,6 @@ import org.apache.flink.cdc.connectors.mysql.table.StartupOptions; import org.apache.flink.table.catalog.ObjectPath; -import com.mysql.cj.conf.PropertyKey; import io.debezium.config.Configuration; import io.debezium.connector.mysql.MySqlConnectorConfig; import io.debezium.relational.RelationalTableFilters; @@ -75,6 +74,7 @@ public class MySqlSourceConfig implements Serializable { private final Properties dbzProperties; private final Configuration dbzConfiguration; private final MySqlConnectorConfig dbzMySqlConfig; + private final boolean treatTinyInt1AsBoolean; MySqlSourceConfig( String hostname, @@ -102,7 +102,8 @@ public class MySqlSourceConfig implements Serializable { Properties jdbcProperties, Map chunkKeyColumns, boolean skipSnapshotBackfill, - boolean parseOnLineSchemaChanges) { + boolean parseOnLineSchemaChanges, + boolean treatTinyInt1AsBoolean) { this.hostname = checkNotNull(hostname); this.port = port; this.username = checkNotNull(username); @@ -131,6 +132,7 @@ public class MySqlSourceConfig implements Serializable { this.chunkKeyColumns = chunkKeyColumns; this.skipSnapshotBackfill = skipSnapshotBackfill; this.parseOnLineSchemaChanges = parseOnLineSchemaChanges; + this.treatTinyInt1AsBoolean = treatTinyInt1AsBoolean; } public String getHostname() { @@ -263,8 +265,7 @@ public boolean isSkipSnapshotBackfill() { return skipSnapshotBackfill; } - public boolean getTinyInt1isBit() { - return Boolean.parseBoolean( - jdbcProperties.getProperty(PropertyKey.tinyInt1isBit.getKeyName(), "true")); + public boolean isTreatTinyInt1AsBoolean() { + return treatTinyInt1AsBoolean; } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java index f0ca4cc9607..7bbd1017d6f 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java @@ -71,6 +71,7 @@ public class MySqlSourceConfigFactory implements Serializable { private Map chunkKeyColumns = new HashMap<>(); private boolean skipSnapshotBackfill = false; private boolean parseOnLineSchemaChanges = false; + private boolean treatTinyInt1AsBoolean = false; public MySqlSourceConfigFactory hostname(String hostname) { this.hostname = hostname; @@ -298,6 +299,11 @@ public MySqlSourceConfigFactory parseOnLineSchemaChanges(boolean parseOnLineSche return this; } + public MySqlSourceConfigFactory treatTinyInt1AsBoolean(boolean treatTinyInt1AsBoolean) { + this.treatTinyInt1AsBoolean = treatTinyInt1AsBoolean; + return this; + } + /** Creates a new {@link MySqlSourceConfig} for the given subtask {@code subtaskId}. */ public MySqlSourceConfig createConfig(int subtaskId) { // hard code server name, because we don't need to distinguish it, docs: @@ -392,6 +398,7 @@ public MySqlSourceConfig createConfig(int subtaskId, String serverName) { jdbcProperties, chunkKeyColumns, skipSnapshotBackfill, - parseOnLineSchemaChanges); + parseOnLineSchemaChanges, + treatTinyInt1AsBoolean); } } From e536531e49db1501f5ec7172399882169174823e Mon Sep 17 00:00:00 2001 From: "north.lin" <37775475+qg-lin@users.noreply.github.com> Date: Thu, 16 Jan 2025 12:52:03 +0800 Subject: [PATCH 8/9] Update MySqlChunkSplitter.java --- .../mysql/source/assigners/MySqlChunkSplitter.java | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitter.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitter.java index 5c069ee3d58..df4df026d47 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitter.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitter.java @@ -32,7 +32,6 @@ import org.apache.flink.table.types.logical.RowType; import org.apache.flink.util.Preconditions; -import com.mysql.cj.conf.PropertyKey; import io.debezium.connector.mysql.MySqlPartition; import io.debezium.jdbc.JdbcConnection; import io.debezium.relational.Column; @@ -388,12 +387,7 @@ private int getDynamicChunkSize( Object max, int chunkSize, long approximateRowCnt) { - boolean tinyInt1isBit = - Boolean.parseBoolean( - sourceConfig - .getJdbcProperties() - .getProperty(PropertyKey.tinyInt1isBit.getKeyName(), "true")); - if (!isEvenlySplitColumn(splitColumn, tinyInt1isBit)) { + if (!isEvenlySplitColumn(splitColumn, sourceConfig.isTreatTinyInt1AsBoolean())) { return -1; } final double distributionFactorUpper = sourceConfig.getDistributionFactorUpper(); From e5be4171de59f303056bfb0690f24cb9c69b336a Mon Sep 17 00:00:00 2001 From: "north.lin" <37775475+qg-lin@users.noreply.github.com> Date: Thu, 16 Jan 2025 13:35:37 +0800 Subject: [PATCH 9/9] change param name --- docs/content.zh/docs/connectors/pipeline-connectors/mysql.md | 2 +- docs/content.zh/docs/faq/faq.md | 4 ++-- docs/content/docs/connectors/pipeline-connectors/mysql.md | 2 +- docs/content/docs/faq/faq.md | 4 ++-- .../cdc/connectors/mysql/factory/MySqlDataSourceFactory.java | 4 ++-- .../cdc/connectors/mysql/source/MySqlDataSourceOptions.java | 4 ++-- .../mysql/source/config/MySqlSourceConfigFactory.java | 2 +- 7 files changed, 11 insertions(+), 11 deletions(-) diff --git a/docs/content.zh/docs/connectors/pipeline-connectors/mysql.md b/docs/content.zh/docs/connectors/pipeline-connectors/mysql.md index e898c5c2540..afd1897167b 100644 --- a/docs/content.zh/docs/connectors/pipeline-connectors/mysql.md +++ b/docs/content.zh/docs/connectors/pipeline-connectors/mysql.md @@ -306,7 +306,7 @@ pipeline: 是否启用同步表、字段注释特性,默认关闭。注意:开启此特性将会对内存使用产生影响。 - treat-tinyint1-as-boolean + treat-tinyint1-as-boolean.enabled optional true Boolean diff --git a/docs/content.zh/docs/faq/faq.md b/docs/content.zh/docs/faq/faq.md index 4aea2e067ea..abacaf54808 100644 --- a/docs/content.zh/docs/faq/faq.md +++ b/docs/content.zh/docs/faq/faq.md @@ -209,13 +209,13 @@ restart-strategy.fixed-delay.delay= 30s ### Q16: MySQL源表中存在TINYINT(1)类型的列,且部分行的数值>1,Pipeline作业下游接收到的数据却是true/false,为什么? 这是由于MySQL连接参数`tinyInt1isBit`默认值为`true`,Flink CDC 3.3.0之前的版本未处理该参数,导致TINYINT(1)类型的数据被解析为布尔值。 -若需将其转换为实际值,请将CDC升级至3.3.0+,并在source节点添加配置`treat-tinyint1-as-boolean: false`。 +若需将其转换为实际值,请将CDC升级至3.3.0+,并在source节点添加配置`treat-tinyint1-as-boolean.enabled: false`。 例如: ```yaml source: type: mysql ... - treat-tinyint1-as-boolean: false + treat-tinyint1-as-boolean.enabled: false sink: type: ... diff --git a/docs/content/docs/connectors/pipeline-connectors/mysql.md b/docs/content/docs/connectors/pipeline-connectors/mysql.md index 621b94440f4..9bcd2a6179f 100644 --- a/docs/content/docs/connectors/pipeline-connectors/mysql.md +++ b/docs/content/docs/connectors/pipeline-connectors/mysql.md @@ -314,7 +314,7 @@ pipeline: Note: Enable this option will bring the implications on memory usage. - treat-tinyint1-as-boolean + treat-tinyint1-as-boolean.enabled optional true Boolean diff --git a/docs/content/docs/faq/faq.md b/docs/content/docs/faq/faq.md index ab5b2d064cc..a74c3550667 100644 --- a/docs/content/docs/faq/faq.md +++ b/docs/content/docs/faq/faq.md @@ -212,13 +212,13 @@ The reason for this problem is that the reading of the full volume phase of the ### Q16: In MySQL source table, there is a TINYINT(1) column where some rows contain values greater than 1. However, downstreams receive this data as true/false in the pipeline job. Why does this happen? This is because the default value of the MySQL connection parameter `tinyInt1isBit` is true and the version of Flink CDC before 3.3.0 didn't convert it, which causes the TINYINT(1) data to be interpreted as boolean values. -To convert it to actual values, please upgrade your CDC version to 3.3.0+ then add the configuration `treat-tinyint1-as-boolean: false` at the source node. +To convert it to actual values, please upgrade your CDC version to 3.3.0+ then add the configuration `treat-tinyint1-as-boolean.enabled: false` at the source node. For example: ```yaml source: type: mysql ... - treat-tinyint1-as-boolean: false + treat-tinyint1-as-boolean.enabled: false sink: type: ... diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java index 526a547779b..48819c50c11 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java @@ -92,7 +92,7 @@ import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SERVER_TIME_ZONE; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TABLES; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TABLES_EXCLUDE; -import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TREAT_TINYINT1_AS_BOOLEAN; +import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TREAT_TINYINT1_AS_BOOLEAN_ENABLED; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.USERNAME; import static org.apache.flink.cdc.connectors.mysql.source.utils.ObjectUtils.doubleCompare; import static org.apache.flink.cdc.debezium.table.DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX; @@ -138,7 +138,7 @@ public DataSource createDataSource(Context context) { boolean closeIdleReaders = config.get(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED); boolean includeComments = config.get(INCLUDE_COMMENTS_ENABLED); - boolean treatTinyInt1AsBoolean = config.get(TREAT_TINYINT1_AS_BOOLEAN); + boolean treatTinyInt1AsBoolean = config.get(TREAT_TINYINT1_AS_BOOLEAN_ENABLED); Duration heartbeatInterval = config.get(HEARTBEAT_INTERVAL); Duration connectTimeout = config.get(CONNECT_TIMEOUT); diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java index 22fbabe5ede..89878d0eab9 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java @@ -300,8 +300,8 @@ public class MySqlDataSourceOptions { + "Note: Enable this option will bring the implications on memory usage."); @Experimental - public static final ConfigOption TREAT_TINYINT1_AS_BOOLEAN = - ConfigOptions.key("treat-tinyint1-as-boolean") + public static final ConfigOption TREAT_TINYINT1_AS_BOOLEAN_ENABLED = + ConfigOptions.key("treat-tinyint1-as-boolean.enabled") .booleanType() .defaultValue(true) .withDescription("Whether treat TINYINT(1) as boolean, by default is true. "); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java index 7bbd1017d6f..c1459bf23f4 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java @@ -71,7 +71,7 @@ public class MySqlSourceConfigFactory implements Serializable { private Map chunkKeyColumns = new HashMap<>(); private boolean skipSnapshotBackfill = false; private boolean parseOnLineSchemaChanges = false; - private boolean treatTinyInt1AsBoolean = false; + private boolean treatTinyInt1AsBoolean = true; public MySqlSourceConfigFactory hostname(String hostname) { this.hostname = hostname;