diff --git a/docs/content.zh/docs/connectors/pipeline-connectors/mysql.md b/docs/content.zh/docs/connectors/pipeline-connectors/mysql.md
index 0de46fd49db..afd1897167b 100644
--- a/docs/content.zh/docs/connectors/pipeline-connectors/mysql.md
+++ b/docs/content.zh/docs/connectors/pipeline-connectors/mysql.md
@@ -305,6 +305,13 @@ pipeline:
Boolean |
是否启用同步表、字段注释特性,默认关闭。注意:开启此特性将会对内存使用产生影响。 |
+
+ | treat-tinyint1-as-boolean.enabled |
+ optional |
+ true |
+ Boolean |
+ 是否将TINYINT(1)类型当做Boolean类型处理,默认true。 |
+
diff --git a/docs/content.zh/docs/faq/faq.md b/docs/content.zh/docs/faq/faq.md
index 54b55fe9931..abacaf54808 100644
--- a/docs/content.zh/docs/faq/faq.md
+++ b/docs/content.zh/docs/faq/faq.md
@@ -207,6 +207,19 @@ restart-strategy.fixed-delay.delay= 30s
1. tableList选项要求表名使用数据库名,而不是DataStream API中的表名。对于MySQL CDC源代码,tableList选项值应该类似于‘my_db.my_table’。
2. 如果要同步排除products和orders表之外的整个my_db库,tableList选项值应该类似于‘my_db.(?!products|orders).*’。
+### Q16: MySQL源表中存在TINYINT(1)类型的列,且部分行的数值>1,Pipeline作业下游接收到的数据却是true/false,为什么?
+这是由于MySQL连接参数`tinyInt1isBit`默认值为`true`,Flink CDC 3.3.0之前的版本未处理该参数,导致TINYINT(1)类型的数据被解析为布尔值。
+若需将其转换为实际值,请将CDC升级至3.3.0+,并在source节点添加配置`treat-tinyint1-as-boolean.enabled: false`。
+例如:
+```yaml
+source:
+ type: mysql
+ ...
+ treat-tinyint1-as-boolean.enabled: false
+
+sink:
+ type: ...
+```
## Postgres CDC FAQ
### Q1: 发现 PG 服务器磁盘使用率高,WAL 不释放 是什么原因?
diff --git a/docs/content/docs/connectors/pipeline-connectors/mysql.md b/docs/content/docs/connectors/pipeline-connectors/mysql.md
index df0faec4d40..9bcd2a6179f 100644
--- a/docs/content/docs/connectors/pipeline-connectors/mysql.md
+++ b/docs/content/docs/connectors/pipeline-connectors/mysql.md
@@ -313,6 +313,13 @@ pipeline:
Whether enable include table and column comments, by default is false, if set to true, the table and column comments will be sent.
Note: Enable this option will bring the implications on memory usage. |
+
+ | treat-tinyint1-as-boolean.enabled |
+ optional |
+ true |
+ Boolean |
+ Whether treat TINYINT(1) as boolean, by default is true. |
+
diff --git a/docs/content/docs/faq/faq.md b/docs/content/docs/faq/faq.md
index 2b57167d5ab..a74c3550667 100644
--- a/docs/content/docs/faq/faq.md
+++ b/docs/content/docs/faq/faq.md
@@ -210,6 +210,19 @@ The reason for this problem is that the reading of the full volume phase of the
1. The `tableList` option requires table name with database name rather than table name in DataStream API. For MySQL CDC source, the `tableList` option value should like ‘my_db.my_table’.
2. If you need to synchronize the whole mydb database excluding the products and orders tables, the `tableList` option value should like 'my_db.(?!products|orders).*'.
+### Q16: In MySQL source table, there is a TINYINT(1) column where some rows contain values greater than 1. However, downstreams receive this data as true/false in the pipeline job. Why does this happen?
+This is because the default value of the MySQL connection parameter `tinyInt1isBit` is true and the version of Flink CDC before 3.3.0 didn't convert it, which causes the TINYINT(1) data to be interpreted as boolean values.
+To convert it to actual values, please upgrade your CDC version to 3.3.0+ then add the configuration `treat-tinyint1-as-boolean.enabled: false` at the source node.
+For example:
+```yaml
+source:
+ type: mysql
+ ...
+ treat-tinyint1-as-boolean.enabled: false
+
+sink:
+ type: ...
+```
## Postgres CDC FAQ
### Q1: It is found that the disk utilization rate of PG server is high. What is the reason why wal is not released?
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java
index 41e60890e22..48819c50c11 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java
@@ -41,6 +41,7 @@
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.ObjectPath;
+import com.mysql.cj.conf.PropertyKey;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.relational.Tables;
import org.slf4j.Logger;
@@ -91,6 +92,7 @@
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SERVER_TIME_ZONE;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TABLES;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TABLES_EXCLUDE;
+import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TREAT_TINYINT1_AS_BOOLEAN_ENABLED;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.USERNAME;
import static org.apache.flink.cdc.connectors.mysql.source.utils.ObjectUtils.doubleCompare;
import static org.apache.flink.cdc.debezium.table.DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX;
@@ -136,6 +138,7 @@ public DataSource createDataSource(Context context) {
boolean closeIdleReaders = config.get(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED);
boolean includeComments = config.get(INCLUDE_COMMENTS_ENABLED);
+ boolean treatTinyInt1AsBoolean = config.get(TREAT_TINYINT1_AS_BOOLEAN_ENABLED);
Duration heartbeatInterval = config.get(HEARTBEAT_INTERVAL);
Duration connectTimeout = config.get(CONNECT_TIMEOUT);
@@ -164,6 +167,11 @@ public DataSource createDataSource(Context context) {
"true");
}
+ if (!treatTinyInt1AsBoolean) {
+ // set jdbc config 'tinyInt1isBit' to false
+ configMap.put(PROPERTIES_PREFIX + PropertyKey.tinyInt1isBit.getKeyName(), "false");
+ }
+
MySqlSourceConfigFactory configFactory =
new MySqlSourceConfigFactory()
.hostname(hostname)
@@ -189,7 +197,8 @@ public DataSource createDataSource(Context context) {
.debeziumProperties(getDebeziumProperties(configMap))
.jdbcProperties(getJdbcProperties(configMap))
.scanNewlyAddedTableEnabled(scanNewlyAddedTableEnabled)
- .parseOnLineSchemaChanges(isParsingOnLineSchemaChanges);
+ .parseOnLineSchemaChanges(isParsingOnLineSchemaChanges)
+ .treatTinyInt1AsBoolean(treatTinyInt1AsBoolean);
List tableIds = MySqlSchemaUtils.listTables(configFactory.createConfig(0), null);
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSource.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSource.java
index 7b4ee5eab07..81912b4a847 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSource.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSource.java
@@ -71,7 +71,8 @@ public EventSourceProvider getEventSourceProvider() {
DebeziumChangelogMode.ALL,
sourceConfig.isIncludeSchemaChanges(),
readableMetadataList,
- includeComments);
+ includeComments,
+ sourceConfig.isTreatTinyInt1AsBoolean());
MySqlSource source =
new MySqlSource<>(
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java
index 3747212441c..89878d0eab9 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java
@@ -298,4 +298,11 @@ public class MySqlDataSourceOptions {
.withDescription(
"Whether enable include table and column comments, by default is false, if set to true, table and column comments will be sent. "
+ "Note: Enable this option will bring the implications on memory usage.");
+
+ @Experimental
+ public static final ConfigOption TREAT_TINYINT1_AS_BOOLEAN_ENABLED =
+ ConfigOptions.key("treat-tinyint1-as-boolean.enabled")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription("Whether treat TINYINT(1) as boolean, by default is true. ");
}
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlEventDeserializer.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlEventDeserializer.java
index 004fc5e1abe..7fee4ebfdd4 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlEventDeserializer.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlEventDeserializer.java
@@ -62,6 +62,7 @@ public class MySqlEventDeserializer extends DebeziumEventDeserializationSchema {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private final boolean includeSchemaChanges;
+ private final boolean tinyInt1isBit;
private final boolean includeComments;
private transient Tables tables;
@@ -70,26 +71,35 @@ public class MySqlEventDeserializer extends DebeziumEventDeserializationSchema {
private List readableMetadataList;
public MySqlEventDeserializer(
- DebeziumChangelogMode changelogMode, boolean includeSchemaChanges) {
- this(changelogMode, includeSchemaChanges, new ArrayList<>(), false);
+ DebeziumChangelogMode changelogMode,
+ boolean includeSchemaChanges,
+ boolean tinyInt1isBit) {
+ this(
+ changelogMode,
+ includeSchemaChanges,
+ new ArrayList<>(),
+ includeSchemaChanges,
+ tinyInt1isBit);
}
public MySqlEventDeserializer(
DebeziumChangelogMode changelogMode,
boolean includeSchemaChanges,
List readableMetadataList,
- boolean includeComments) {
+ boolean includeComments,
+ boolean tinyInt1isBit) {
super(new MySqlSchemaDataTypeInference(), changelogMode);
this.includeSchemaChanges = includeSchemaChanges;
this.readableMetadataList = readableMetadataList;
this.includeComments = includeComments;
+ this.tinyInt1isBit = tinyInt1isBit;
}
@Override
protected List deserializeSchemaChangeRecord(SourceRecord record) {
if (includeSchemaChanges) {
if (customParser == null) {
- customParser = new CustomMySqlAntlrDdlParser(includeComments);
+ customParser = new CustomMySqlAntlrDdlParser(includeComments, tinyInt1isBit);
tables = new Tables();
}
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java
index 3b30b3c4940..31ee183e53c 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java
@@ -60,6 +60,7 @@ public class CustomAlterTableParserListener extends MySqlParserBaseListener {
private final MySqlAntlrDdlParser parser;
private final List listeners;
private final LinkedList changes;
+ private final boolean tinyInt1isBit;
private org.apache.flink.cdc.common.event.TableId currentTable;
private List columnEditors;
private CustomColumnDefinitionParserListener columnDefinitionListener;
@@ -70,10 +71,12 @@ public class CustomAlterTableParserListener extends MySqlParserBaseListener {
public CustomAlterTableParserListener(
MySqlAntlrDdlParser parser,
List listeners,
- LinkedList changes) {
+ LinkedList changes,
+ boolean tinyInt1isBit) {
this.parser = parser;
this.listeners = listeners;
this.changes = changes;
+ this.tinyInt1isBit = tinyInt1isBit;
}
@Override
@@ -315,7 +318,7 @@ public void exitAlterByChangeColumn(MySqlParser.AlterByChangeColumnContext ctx)
String newColumnName = parser.parseName(ctx.newColumn);
Map typeMapping = new HashMap<>();
- typeMapping.put(column.name(), fromDbzColumn(column));
+ typeMapping.put(column.name(), fromDbzColumn(column, tinyInt1isBit));
changes.add(new AlterColumnTypeEvent(currentTable, typeMapping));
if (newColumnName != null && !column.name().equalsIgnoreCase(newColumnName)) {
@@ -366,7 +369,7 @@ public void exitAlterByModifyColumn(MySqlParser.AlterByModifyColumnContext ctx)
() -> {
Column column = columnDefinitionListener.getColumn();
Map typeMapping = new HashMap<>();
- typeMapping.put(column.name(), fromDbzColumn(column));
+ typeMapping.put(column.name(), fromDbzColumn(column, tinyInt1isBit));
changes.add(new AlterColumnTypeEvent(currentTable, typeMapping));
listeners.remove(columnDefinitionListener);
},
@@ -413,7 +416,7 @@ public void exitDropTable(MySqlParser.DropTableContext ctx) {
private org.apache.flink.cdc.common.schema.Column toCdcColumn(Column dbzColumn) {
return org.apache.flink.cdc.common.schema.Column.physicalColumn(
dbzColumn.name(),
- fromDbzColumn(dbzColumn),
+ fromDbzColumn(dbzColumn, tinyInt1isBit),
dbzColumn.comment(),
dbzColumn.defaultValueExpression().orElse(null));
}
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomMySqlAntlrDdlParser.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomMySqlAntlrDdlParser.java
index 624d1ac416c..9a29ebe469d 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomMySqlAntlrDdlParser.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomMySqlAntlrDdlParser.java
@@ -35,10 +35,12 @@
public class CustomMySqlAntlrDdlParser extends MySqlAntlrDdlParser {
private final LinkedList parsedEvents;
+ private final boolean tinyInt1isBit;
- public CustomMySqlAntlrDdlParser(boolean includeComments) {
+ public CustomMySqlAntlrDdlParser(boolean includeComments, boolean tinyInt1isBit) {
super(true, false, includeComments, null, Tables.TableFilter.includeAll());
this.parsedEvents = new LinkedList<>();
+ this.tinyInt1isBit = tinyInt1isBit;
}
// Overriding this method because the BIT type requires default length dimension of 1.
@@ -278,7 +280,7 @@ protected DataTypeResolver initializeDataTypeResolver() {
@Override
protected AntlrDdlParserListener createParseTreeWalkerListener() {
- return new CustomMySqlAntlrDdlParserListener(this, parsedEvents);
+ return new CustomMySqlAntlrDdlParserListener(this, parsedEvents, tinyInt1isBit);
}
public List getAndClearParsedEvents() {
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomMySqlAntlrDdlParserListener.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomMySqlAntlrDdlParserListener.java
index 445373309fa..7e24e264759 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomMySqlAntlrDdlParserListener.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomMySqlAntlrDdlParserListener.java
@@ -74,12 +74,15 @@ public class CustomMySqlAntlrDdlParserListener extends MySqlParserBaseListener
private final Collection errors = new ArrayList<>();
public CustomMySqlAntlrDdlParserListener(
- MySqlAntlrDdlParser parser, LinkedList parsedEvents) {
+ MySqlAntlrDdlParser parser,
+ LinkedList parsedEvents,
+ boolean tinyInt1isBit) {
// initialize listeners
listeners.add(new CreateAndAlterDatabaseParserListener(parser));
listeners.add(new DropDatabaseParserListener(parser));
listeners.add(new CreateTableParserListener(parser, listeners));
- listeners.add(new CustomAlterTableParserListener(parser, listeners, parsedEvents));
+ listeners.add(
+ new CustomAlterTableParserListener(parser, listeners, parsedEvents, tinyInt1isBit));
listeners.add(new DropTableParserListener(parser));
listeners.add(new RenameTableParserListener(parser));
listeners.add(new TruncateTableParserListener(parser));
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java
index b5a6ec197d6..143efa120b0 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java
@@ -202,7 +202,8 @@ private Schema parseDDL(String ddlStatement, TableId tableId) {
Column column = columns.get(i);
String colName = column.name();
- DataType dataType = MySqlTypeUtils.fromDbzColumn(column);
+ DataType dataType =
+ MySqlTypeUtils.fromDbzColumn(column, sourceConfig.isTreatTinyInt1AsBoolean());
if (!column.isOptional()) {
dataType = dataType.notNull();
}
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/utils/MySqlSchemaUtils.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/utils/MySqlSchemaUtils.java
index bc4135cd1fb..616e28b1db6 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/utils/MySqlSchemaUtils.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/utils/MySqlSchemaUtils.java
@@ -129,14 +129,14 @@ public static Schema getTableSchema(
new MySqlSchema(sourceConfig, jdbc.isTableIdCaseSensitive())) {
TableChanges.TableChange tableSchema =
mySqlSchema.getTableSchema(partition, jdbc, toDbzTableId(tableId));
- return toSchema(tableSchema.getTable());
+ return toSchema(tableSchema.getTable(), sourceConfig.isTreatTinyInt1AsBoolean());
}
}
- public static Schema toSchema(Table table) {
+ public static Schema toSchema(Table table, boolean tinyInt1isBit) {
List columns =
table.columns().stream()
- .map(MySqlSchemaUtils::toColumn)
+ .map(column -> toColumn(column, tinyInt1isBit))
.collect(Collectors.toList());
return Schema.newBuilder()
@@ -146,9 +146,11 @@ public static Schema toSchema(Table table) {
.build();
}
- public static Column toColumn(io.debezium.relational.Column column) {
+ public static Column toColumn(io.debezium.relational.Column column, boolean tinyInt1isBit) {
return Column.physicalColumn(
- column.name(), MySqlTypeUtils.fromDbzColumn(column), column.comment());
+ column.name(),
+ MySqlTypeUtils.fromDbzColumn(column, tinyInt1isBit),
+ column.comment());
}
public static io.debezium.relational.TableId toDbzTableId(TableId tableId) {
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/utils/MySqlTypeUtils.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/utils/MySqlTypeUtils.java
index c82525cf60c..05a23f7cc85 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/utils/MySqlTypeUtils.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/utils/MySqlTypeUtils.java
@@ -110,8 +110,8 @@ public class MySqlTypeUtils {
private static final int FLOAT_LENGTH_UNSPECIFIED_FLAG = -1;
/** Returns a corresponding Flink data type from a debezium {@link Column}. */
- public static DataType fromDbzColumn(Column column) {
- DataType dataType = convertFromColumn(column);
+ public static DataType fromDbzColumn(Column column, boolean tinyInt1isBit) {
+ DataType dataType = convertFromColumn(column, tinyInt1isBit);
if (column.isOptional()) {
return dataType;
} else {
@@ -123,7 +123,7 @@ public static DataType fromDbzColumn(Column column) {
* Returns a corresponding Flink data type from a debezium {@link Column} with nullable always
* be true.
*/
- private static DataType convertFromColumn(Column column) {
+ private static DataType convertFromColumn(Column column, boolean tinyInt1isBit) {
String typeName = column.typeName();
switch (typeName) {
case BIT:
@@ -138,7 +138,9 @@ private static DataType convertFromColumn(Column column) {
// user should not use tinyint(1) to store number although jdbc url parameter
// tinyInt1isBit=false can help change the return value, it's not a general way
// btw: mybatis and mysql-connector-java map tinyint(1) to boolean by default
- return column.length() == 1 ? DataTypes.BOOLEAN() : DataTypes.TINYINT();
+ return (column.length() == 1 && tinyInt1isBit)
+ ? DataTypes.BOOLEAN()
+ : DataTypes.TINYINT();
case TINYINT_UNSIGNED:
case TINYINT_UNSIGNED_ZEROFILL:
case SMALLINT:
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlMetadataAccessorITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlMetadataAccessorITCase.java
index 94f1ed45dc5..96f3b5f92b3 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlMetadataAccessorITCase.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlMetadataAccessorITCase.java
@@ -102,13 +102,23 @@ public void testMysql8AccessDatabaseAndTable() {
}
@Test
- public void testMysql57AccessCommonTypesSchema() {
- testAccessCommonTypesSchema(fullTypesMySql57Database);
+ public void testMysql57AccessCommonTypesSchemaTinyInt1isBit() {
+ testAccessCommonTypesSchema(fullTypesMySql57Database, true);
}
@Test
- public void testMysql8AccessCommonTypesSchema() {
- testAccessCommonTypesSchema(fullTypesMySql8Database);
+ public void testMysql57AccessCommonTypesSchemaTinyInt1isNotBit() {
+ testAccessCommonTypesSchema(fullTypesMySql57Database, false);
+ }
+
+ @Test
+ public void testMysql8AccessCommonTypesSchemaTinyInt1isBit() {
+ testAccessCommonTypesSchema(fullTypesMySql8Database, true);
+ }
+
+ @Test
+ public void testMysql8AccessCommonTypesSchemaTinyInt1isNotBit() {
+ testAccessCommonTypesSchema(fullTypesMySql8Database, false);
}
@Test
@@ -117,7 +127,7 @@ public void testMysql57AccessTimeTypesSchema() {
String[] tables = new String[] {"time_types"};
MySqlMetadataAccessor metadataAccessor =
- getMetadataAccessor(tables, fullTypesMySql57Database);
+ getMetadataAccessor(tables, fullTypesMySql57Database, true);
Schema actualSchema =
metadataAccessor.getTableSchema(
@@ -163,7 +173,7 @@ public void testMysql8AccessTimeTypesSchema() {
String[] tables = new String[] {"time_types"};
MySqlMetadataAccessor metadataAccessor =
- getMetadataAccessor(tables, fullTypesMySql8Database);
+ getMetadataAccessor(tables, fullTypesMySql8Database, true);
Schema actualSchema =
metadataAccessor.getTableSchema(
@@ -213,7 +223,7 @@ public void testMysql57PrecisionTypesSchema() {
String[] tables = new String[] {"precision_types"};
MySqlMetadataAccessor metadataAccessor =
- getMetadataAccessor(tables, fullTypesMySql57Database);
+ getMetadataAccessor(tables, fullTypesMySql57Database, true);
Schema actualSchema =
metadataAccessor.getTableSchema(
@@ -288,7 +298,7 @@ public void testMysql8PrecisionTypesSchema() {
String[] tables = new String[] {"precision_types"};
MySqlMetadataAccessor metadataAccessor =
- getMetadataAccessor(tables, fullTypesMySql8Database);
+ getMetadataAccessor(tables, fullTypesMySql8Database, false);
Schema actualSchema =
metadataAccessor.getTableSchema(
@@ -361,7 +371,7 @@ private void testAccessDatabaseAndTable(UniqueDatabase database) {
database.createAndInitialize();
String[] tables = new String[] {"common_types", "time_types", "precision_types"};
- MySqlMetadataAccessor metadataAccessor = getMetadataAccessor(tables, database);
+ MySqlMetadataAccessor metadataAccessor = getMetadataAccessor(tables, database, true);
assertThatThrownBy(metadataAccessor::listNamespaces)
.isInstanceOf(UnsupportedOperationException.class);
@@ -377,11 +387,12 @@ private void testAccessDatabaseAndTable(UniqueDatabase database) {
assertThat(actualTables).containsExactlyInAnyOrderElementsOf(expectedTables);
}
- private void testAccessCommonTypesSchema(UniqueDatabase database) {
+ private void testAccessCommonTypesSchema(UniqueDatabase database, boolean tinyint1IsBit) {
database.createAndInitialize();
String[] tables = new String[] {"common_types"};
- MySqlMetadataAccessor metadataAccessor = getMetadataAccessor(tables, database);
+ MySqlMetadataAccessor metadataAccessor =
+ getMetadataAccessor(tables, database, tinyint1IsBit);
Schema actualSchema =
metadataAccessor.getTableSchema(
@@ -427,8 +438,12 @@ private void testAccessCommonTypesSchema(UniqueDatabase database) {
DataTypes.STRING(),
DataTypes.BOOLEAN(),
DataTypes.BINARY(1),
- DataTypes.BOOLEAN(),
- DataTypes.BOOLEAN(),
+ tinyint1IsBit
+ ? DataTypes.BOOLEAN()
+ : DataTypes.TINYINT(),
+ tinyint1IsBit
+ ? DataTypes.BOOLEAN()
+ : DataTypes.TINYINT(),
DataTypes.BINARY(16),
DataTypes.BINARY(8),
DataTypes.STRING(),
@@ -507,12 +522,14 @@ private void testAccessCommonTypesSchema(UniqueDatabase database) {
assertThat(actualSchema).isEqualTo(expectedSchema);
}
- private MySqlMetadataAccessor getMetadataAccessor(String[] tables, UniqueDatabase database) {
- MySqlSourceConfig sourceConfig = getConfig(tables, database);
+ private MySqlMetadataAccessor getMetadataAccessor(
+ String[] tables, UniqueDatabase database, boolean tinyint1IsBit) {
+ MySqlSourceConfig sourceConfig = getConfig(tables, database, tinyint1IsBit);
return new MySqlMetadataAccessor(sourceConfig);
}
- private MySqlSourceConfig getConfig(String[] captureTables, UniqueDatabase database) {
+ private MySqlSourceConfig getConfig(
+ String[] captureTables, UniqueDatabase database, boolean tinyint1IsBit) {
String[] captureTableIds =
Arrays.stream(captureTables)
.map(tableName -> database.getDatabaseName() + "." + tableName)
@@ -530,6 +547,7 @@ private MySqlSourceConfig getConfig(String[] captureTables, UniqueDatabase datab
.username(database.getUsername())
.password(database.getPassword())
.serverTimeZone(ZoneId.of("UTC").toString())
+ .treatTinyInt1AsBoolean(tinyint1IsBit)
.createConfig(0);
}
}
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java
index 1252733f4ab..3e338a68ca6 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java
@@ -271,6 +271,16 @@ private static List fetchResultsExcept(Iterator iter, int size, T side
return result;
}
+ @Test
+ public void testParseAlterStatementTinyintIsBit() throws Exception {
+ testParseAlterStatement(true);
+ }
+
+ @Test
+ public void testParseAlterStatementTinyint1IsNotBit() throws Exception {
+ testParseAlterStatement(false);
+ }
+
@Test
public void testInitialStartupModeWithOpTs() throws Exception {
inventoryDatabase.createAndInitialize();
@@ -433,10 +443,10 @@ public void testInitialStartupModeWithOpTs() throws Exception {
}
}
- @Test
- public void testParseAlterStatement() throws Exception {
+ public void testParseAlterStatement(boolean tinyInt1isBit) throws Exception {
env.setParallelism(1);
inventoryDatabase.createAndInitialize();
+
MySqlSourceConfigFactory configFactory =
new MySqlSourceConfigFactory()
.hostname(MYSQL8_CONTAINER.getHost())
@@ -448,6 +458,7 @@ public void testParseAlterStatement() throws Exception {
.startupOptions(StartupOptions.latest())
.serverId(getServerId(env.getParallelism()))
.serverTimeZone("UTC")
+ .treatTinyInt1AsBoolean(tinyInt1isBit)
.includeSchemaChanges(SCHEMA_CHANGE_ENABLED.defaultValue());
FlinkSourceProvider sourceProvider =
@@ -548,6 +559,21 @@ public void testParseAlterStatement() throws Exception {
new AddColumnEvent.ColumnWithPosition(
Column.physicalColumn("cols9", DataTypes.CHAR(1))))));
+ statement.execute(
+ String.format(
+ "ALTER TABLE `%s`.`products` ADD COLUMN `cols10` TINYINT(1) NULL;",
+ inventoryDatabase.getDatabaseName()));
+ expected.add(
+ new AddColumnEvent(
+ tableId,
+ Collections.singletonList(
+ new AddColumnEvent.ColumnWithPosition(
+ Column.physicalColumn(
+ "cols10",
+ tinyInt1isBit
+ ? DataTypes.BOOLEAN()
+ : DataTypes.TINYINT())))));
+
// Drop orders table first to remove foreign key restraints
statement.execute(
String.format(
@@ -569,9 +595,19 @@ public void testParseAlterStatement() throws Exception {
}
@Test
- public void testSchemaChangeEvents() throws Exception {
+ public void testSchemaChangeEventstinyInt1isBit() throws Exception {
+ testSchemaChangeEvents(true);
+ }
+
+ @Test
+ public void testSchemaChangeEventsTinyint1IsNotBit() throws Exception {
+ testSchemaChangeEvents(false);
+ }
+
+ public void testSchemaChangeEvents(boolean tinyInt1isBit) throws Exception {
env.setParallelism(1);
inventoryDatabase.createAndInitialize();
+
MySqlSourceConfigFactory configFactory =
new MySqlSourceConfigFactory()
.hostname(MYSQL8_CONTAINER.getHost())
@@ -583,6 +619,7 @@ public void testSchemaChangeEvents() throws Exception {
.startupOptions(StartupOptions.latest())
.serverId(getServerId(env.getParallelism()))
.serverTimeZone("UTC")
+ .treatTinyInt1AsBoolean(tinyInt1isBit)
.includeSchemaChanges(SCHEMA_CHANGE_ENABLED.defaultValue());
FlinkSourceProvider sourceProvider =
@@ -614,6 +651,35 @@ public void testSchemaChangeEvents() throws Exception {
new AddColumnEvent.ColumnWithPosition(
Column.physicalColumn("newcol1", DataTypes.INT())))));
+ // Add a TINYINT(1) column
+ statement.execute(
+ String.format(
+ "ALTER TABLE `%s`.`customers` ADD COLUMN `new_tinyint1_col1` TINYINT(1) NULL;",
+ inventoryDatabase.getDatabaseName()));
+ expected.add(
+ new AddColumnEvent(
+ TableId.tableId(inventoryDatabase.getDatabaseName(), "customers"),
+ Collections.singletonList(
+ new AddColumnEvent.ColumnWithPosition(
+ Column.physicalColumn(
+ "new_tinyint1_col1",
+ tinyInt1isBit
+ ? DataTypes.BOOLEAN()
+ : DataTypes.TINYINT())))));
+
+ // Add a new BOOLEAN column
+ statement.execute(
+ String.format(
+ "ALTER TABLE `%s`.`customers` ADD COLUMN `new_bool_col1` bool NULL;",
+ inventoryDatabase.getDatabaseName()));
+ expected.add(
+ new AddColumnEvent(
+ TableId.tableId(inventoryDatabase.getDatabaseName(), "customers"),
+ Collections.singletonList(
+ new AddColumnEvent.ColumnWithPosition(
+ Column.physicalColumn(
+ "new_bool_col1", DataTypes.BOOLEAN())))));
+
// Test MODIFY COLUMN DDL
statement.execute(
String.format(
@@ -625,6 +691,16 @@ public void testSchemaChangeEvents() throws Exception {
TableId.tableId(inventoryDatabase.getDatabaseName(), "customers"),
Collections.singletonMap("newcol1", DataTypes.DOUBLE())));
+ statement.execute(
+ String.format(
+ "ALTER TABLE `%s`.`customers` MODIFY COLUMN `new_tinyint1_col1` INT;",
+ inventoryDatabase.getDatabaseName()));
+
+ expected.add(
+ new AlterColumnTypeEvent(
+ TableId.tableId(inventoryDatabase.getDatabaseName(), "customers"),
+ Collections.singletonMap("new_tinyint1_col1", DataTypes.INT())));
+
// Test CHANGE COLUMN DDL
statement.execute(
String.format(
@@ -790,7 +866,11 @@ public void testSchemaChangeEvents() throws Exception {
.physicalColumn("big_decimal_c", DataTypes.STRING())
.physicalColumn("bit1_c", DataTypes.BOOLEAN())
.physicalColumn("bit3_c", DataTypes.BINARY(1))
- .physicalColumn("tiny1_c", DataTypes.BOOLEAN())
+ .physicalColumn(
+ "tiny1_c",
+ tinyInt1isBit
+ ? DataTypes.BOOLEAN()
+ : DataTypes.TINYINT())
.physicalColumn("boolean_c", DataTypes.BOOLEAN())
.physicalColumn("file_uuid", DataTypes.BINARY(16))
.physicalColumn("bit_c", DataTypes.BINARY(8))
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java
index 447fda96a23..dee96802cce 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java
@@ -252,7 +252,8 @@ private boolean shouldEmit(SourceRecord sourceRecord) {
RowType splitKeyType =
ChunkUtils.getChunkKeyColumnType(
statefulTaskContext.getDatabaseSchema().tableFor(tableId),
- statefulTaskContext.getSourceConfig().getChunkKeyColumns());
+ statefulTaskContext.getSourceConfig().getChunkKeyColumns(),
+ statefulTaskContext.getSourceConfig().isTreatTinyInt1AsBoolean());
Struct target = RecordUtils.getStructContainsChunkKey(sourceRecord);
Object[] chunkKey =
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/schema/MySqlTypeUtils.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/schema/MySqlTypeUtils.java
index 57763285251..bd770081832 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/schema/MySqlTypeUtils.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/schema/MySqlTypeUtils.java
@@ -107,8 +107,8 @@ public class MySqlTypeUtils {
private static final String UNKNOWN = "UNKNOWN";
/** Returns a corresponding Flink data type from a debezium {@link Column}. */
- public static DataType fromDbzColumn(Column column) {
- DataType dataType = convertFromColumn(column);
+ public static DataType fromDbzColumn(Column column, boolean tinyInt1isBit) {
+ DataType dataType = convertFromColumn(column, tinyInt1isBit);
if (column.isOptional()) {
return dataType;
} else {
@@ -120,7 +120,7 @@ public static DataType fromDbzColumn(Column column) {
* Returns a corresponding Flink data type from a debezium {@link Column} with nullable always
* be true.
*/
- private static DataType convertFromColumn(Column column) {
+ private static DataType convertFromColumn(Column column, boolean tinyInt1isBit) {
String typeName = column.typeName();
switch (typeName) {
case BIT:
@@ -135,7 +135,9 @@ private static DataType convertFromColumn(Column column) {
// user should not use tinyint(1) to store number although jdbc url parameter
// tinyInt1isBit=false can help change the return value, it's not a general way
// btw: mybatis and mysql-connector-java map tinyint(1) to boolean by default
- return column.length() == 1 ? DataTypes.BOOLEAN() : DataTypes.TINYINT();
+ return (column.length() == 1 && tinyInt1isBit)
+ ? DataTypes.BOOLEAN()
+ : DataTypes.TINYINT();
case TINYINT_UNSIGNED:
case TINYINT_UNSIGNED_ZEROFILL:
case SMALLINT:
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitter.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitter.java
index d22cc555f9f..df4df026d47 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitter.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitter.java
@@ -148,7 +148,9 @@ private void analyzeTable(MySqlPartition partition, TableId tableId) {
splitColumn =
ChunkUtils.getChunkKeyColumn(
currentSplittingTable, sourceConfig.getChunkKeyColumns());
- splitType = ChunkUtils.getChunkKeyColumnType(splitColumn);
+ splitType =
+ ChunkUtils.getChunkKeyColumnType(
+ splitColumn, sourceConfig.isTreatTinyInt1AsBoolean());
minMaxOfSplitColumn =
StatementUtils.queryMinMax(jdbcConnection, tableId, splitColumn.name());
approximateRowCnt = StatementUtils.queryApproximateRowCnt(jdbcConnection, tableId);
@@ -385,7 +387,7 @@ private int getDynamicChunkSize(
Object max,
int chunkSize,
long approximateRowCnt) {
- if (!isEvenlySplitColumn(splitColumn)) {
+ if (!isEvenlySplitColumn(splitColumn, sourceConfig.isTreatTinyInt1AsBoolean())) {
return -1;
}
final double distributionFactorUpper = sourceConfig.getDistributionFactorUpper();
@@ -410,8 +412,8 @@ private int getDynamicChunkSize(
}
/** Checks whether split column is evenly distributed across its range. */
- private static boolean isEvenlySplitColumn(Column splitColumn) {
- DataType flinkType = MySqlTypeUtils.fromDbzColumn(splitColumn);
+ private static boolean isEvenlySplitColumn(Column splitColumn, boolean tinyInt1isBit) {
+ DataType flinkType = MySqlTypeUtils.fromDbzColumn(splitColumn, tinyInt1isBit);
LogicalTypeRoot typeRoot = flinkType.getLogicalType().getTypeRoot();
// currently, we only support the optimization that split column with type BIGINT, INT,
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java
index 85079f28c6d..56d51051615 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java
@@ -74,6 +74,7 @@ public class MySqlSourceConfig implements Serializable {
private final Properties dbzProperties;
private final Configuration dbzConfiguration;
private final MySqlConnectorConfig dbzMySqlConfig;
+ private final boolean treatTinyInt1AsBoolean;
MySqlSourceConfig(
String hostname,
@@ -101,7 +102,8 @@ public class MySqlSourceConfig implements Serializable {
Properties jdbcProperties,
Map chunkKeyColumns,
boolean skipSnapshotBackfill,
- boolean parseOnLineSchemaChanges) {
+ boolean parseOnLineSchemaChanges,
+ boolean treatTinyInt1AsBoolean) {
this.hostname = checkNotNull(hostname);
this.port = port;
this.username = checkNotNull(username);
@@ -130,6 +132,7 @@ public class MySqlSourceConfig implements Serializable {
this.chunkKeyColumns = chunkKeyColumns;
this.skipSnapshotBackfill = skipSnapshotBackfill;
this.parseOnLineSchemaChanges = parseOnLineSchemaChanges;
+ this.treatTinyInt1AsBoolean = treatTinyInt1AsBoolean;
}
public String getHostname() {
@@ -261,4 +264,8 @@ public Map getChunkKeyColumns() {
public boolean isSkipSnapshotBackfill() {
return skipSnapshotBackfill;
}
+
+ public boolean isTreatTinyInt1AsBoolean() {
+ return treatTinyInt1AsBoolean;
+ }
}
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java
index f0ca4cc9607..c1459bf23f4 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java
@@ -71,6 +71,7 @@ public class MySqlSourceConfigFactory implements Serializable {
private Map chunkKeyColumns = new HashMap<>();
private boolean skipSnapshotBackfill = false;
private boolean parseOnLineSchemaChanges = false;
+ private boolean treatTinyInt1AsBoolean = true;
public MySqlSourceConfigFactory hostname(String hostname) {
this.hostname = hostname;
@@ -298,6 +299,11 @@ public MySqlSourceConfigFactory parseOnLineSchemaChanges(boolean parseOnLineSche
return this;
}
+ public MySqlSourceConfigFactory treatTinyInt1AsBoolean(boolean treatTinyInt1AsBoolean) {
+ this.treatTinyInt1AsBoolean = treatTinyInt1AsBoolean;
+ return this;
+ }
+
/** Creates a new {@link MySqlSourceConfig} for the given subtask {@code subtaskId}. */
public MySqlSourceConfig createConfig(int subtaskId) {
// hard code server name, because we don't need to distinguish it, docs:
@@ -392,6 +398,7 @@ public MySqlSourceConfig createConfig(int subtaskId, String serverName) {
jdbcProperties,
chunkKeyColumns,
skipSnapshotBackfill,
- parseOnLineSchemaChanges);
+ parseOnLineSchemaChanges,
+ treatTinyInt1AsBoolean);
}
}
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/ChunkUtils.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/ChunkUtils.java
index 49325609988..405fd1f9638 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/ChunkUtils.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/ChunkUtils.java
@@ -45,13 +45,15 @@ public class ChunkUtils {
private ChunkUtils() {}
public static RowType getChunkKeyColumnType(
- Table table, Map chunkKeyColumns) {
- return getChunkKeyColumnType(getChunkKeyColumn(table, chunkKeyColumns));
+ Table table, Map chunkKeyColumns, boolean tinyInt1isBit) {
+ return getChunkKeyColumnType(getChunkKeyColumn(table, chunkKeyColumns), tinyInt1isBit);
}
- public static RowType getChunkKeyColumnType(Column chunkKeyColumn) {
+ public static RowType getChunkKeyColumnType(Column chunkKeyColumn, boolean tinyInt1isBit) {
return (RowType)
- ROW(FIELD(chunkKeyColumn.name(), MySqlTypeUtils.fromDbzColumn(chunkKeyColumn)))
+ ROW(FIELD(
+ chunkKeyColumn.name(),
+ MySqlTypeUtils.fromDbzColumn(chunkKeyColumn, tinyInt1isBit)))
.getLogicalType();
}
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java
index 7f16cab3f0d..807d29b0bde 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java
@@ -585,7 +585,7 @@ private List getTestAssignSnapshotSplitsFromCheckpoint(AssignerStatus as
RowType splitKeyType =
ChunkUtils.getChunkKeyColumnType(
- Column.editor().name("id").type("INT").jdbcType(4).create());
+ Column.editor().name("id").type("INT").jdbcType(4).create(), true);
List remainingSplits =
Arrays.asList(
new MySqlSchemalessSnapshotSplit(