From 2e5dae0d7c0044a9b77c7c349aa4724e0f500986 Mon Sep 17 00:00:00 2001 From: haoke Date: Thu, 23 May 2024 16:35:41 +0800 Subject: [PATCH 1/3] [FLINK-35432]Support catch modify event for the mysql. --- .../CustomAlterTableParserListener.java | 26 +++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java index 6f3fb9a8ccc..e201fcbd67d 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java @@ -234,6 +234,32 @@ public void enterAlterByRenameColumn(MySqlParser.AlterByRenameColumnContext ctx) super.enterAlterByRenameColumn(ctx); } + @Override + public void enterAlterByModifyColumn(MySqlParser.AlterByModifyColumnContext ctx) { + String oldColumnName = parser.parseName(ctx.uid(0)); + ColumnEditor columnEditor = Column.editor().name(oldColumnName); + columnEditor.unsetDefaultValueExpression(); + + columnDefinitionListener = + new CustomColumnDefinitionParserListener(columnEditor, parser, listeners); + listeners.add(columnDefinitionListener); + super.enterAlterByModifyColumn(ctx); + } + + @Override + public void exitAlterByModifyColumn(MySqlParser.AlterByModifyColumnContext ctx) { + parser.runIfNotNull( + () -> { + Column column = columnDefinitionListener.getColumn(); + Map typeMapping = new HashMap<>(); + typeMapping.put(column.name(), fromDbzColumn(column)); + changes.add(new AlterColumnTypeEvent(currentTable, typeMapping)); + listeners.remove(columnDefinitionListener); + }, + columnDefinitionListener); + super.exitAlterByModifyColumn(ctx); + } + @Override public void exitAlterByRenameColumn(MySqlParser.AlterByRenameColumnContext ctx) { parser.runIfNotNull( From 9468f0fca0582f92fdc0f10f4fe763edc72d1c72 Mon Sep 17 00:00:00 2001 From: haoke Date: Thu, 23 May 2024 16:46:08 +0800 Subject: [PATCH 2/3] [FLINK-35432]add unittest for modify sql. --- .../cdc/connectors/mysql/source/MySqlPipelineITCase.java | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java index 5eb6ce0e598..092a07b4d29 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java @@ -560,6 +560,14 @@ private List executeAlterAndProvideExpected(TableId tableId, Statement st inventoryDatabase.getDatabaseName())); expected.add(new DropColumnEvent(tableId, Collections.singletonList("DESC3"))); + statement.execute( + String.format( + "ALTER TABLE `%s`.`products` MODIFY COLUMN `DESC3` VARCHAR(255) NULL DEFAULT NULL;", + inventoryDatabase.getDatabaseName())); + expected.add( + new AlterColumnTypeEvent( + tableId, Collections.singletonMap("DESC3", DataTypes.VARCHAR(255)))); + // Should not catch SchemaChangeEvent of tables other than `products` statement.execute( String.format( From 22963306087814ba4688b1efac086b8d81840c76 Mon Sep 17 00:00:00 2001 From: haoke Date: Thu, 23 May 2024 16:52:32 +0800 Subject: [PATCH 3/3] [FLINK-35432]fixed checkstyle. --- .../connectors/mysql/source/MySqlPipelineITCase.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java index 092a07b4d29..07945432320 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java @@ -554,12 +554,6 @@ private List executeAlterAndProvideExpected(TableId tableId, Statement st inventoryDatabase.getDatabaseName())); expected.add(new RenameColumnEvent(tableId, Collections.singletonMap("desc1", "desc3"))); - statement.execute( - String.format( - "ALTER TABLE `%s`.`products` DROP COLUMN `DESC3`;", - inventoryDatabase.getDatabaseName())); - expected.add(new DropColumnEvent(tableId, Collections.singletonList("DESC3"))); - statement.execute( String.format( "ALTER TABLE `%s`.`products` MODIFY COLUMN `DESC3` VARCHAR(255) NULL DEFAULT NULL;", @@ -568,6 +562,12 @@ private List executeAlterAndProvideExpected(TableId tableId, Statement st new AlterColumnTypeEvent( tableId, Collections.singletonMap("DESC3", DataTypes.VARCHAR(255)))); + statement.execute( + String.format( + "ALTER TABLE `%s`.`products` DROP COLUMN `DESC3`;", + inventoryDatabase.getDatabaseName())); + expected.add(new DropColumnEvent(tableId, Collections.singletonList("DESC3"))); + // Should not catch SchemaChangeEvent of tables other than `products` statement.execute( String.format(