From fe8663837ca5e205ce3ab8e1aa9c5e5829683054 Mon Sep 17 00:00:00 2001 From: ChengJie1053 <18033291053@163.com> Date: Fri, 7 Jun 2024 19:33:21 +0800 Subject: [PATCH] [cdc][mysql]Mysql supports modify column schema changes --- .../CustomAlterTableParserListener.java | 29 +++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java index 6f3fb9a8ccc..9b3824fbeee 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java @@ -251,6 +251,35 @@ public void exitAlterByRenameColumn(MySqlParser.AlterByRenameColumnContext ctx) super.exitAlterByRenameColumn(ctx); } + @Override + public void enterAlterByModifyColumn(MySqlParser.AlterByModifyColumnContext ctx) { + String columnName = parser.parseName(ctx.uid(0)); + ColumnEditor columnEditor = Column.editor().name(columnName); + columnEditor.unsetDefaultValueExpression(); + + columnDefinitionListener = + new CustomColumnDefinitionParserListener(columnEditor, parser, listeners); + listeners.add(columnDefinitionListener); + super.enterAlterByModifyColumn(ctx); + } + + @Override + public void exitAlterByModifyColumn(MySqlParser.AlterByModifyColumnContext ctx) { + parser.runIfNotNull( + () -> { + Column column = columnDefinitionListener.getColumn(); + if (column != null) { + Map typeMapping = new HashMap<>(); + typeMapping.put(column.name(), fromDbzColumn(column)); + changes.add(new AlterColumnTypeEvent(currentTable, typeMapping)); + } + + listeners.remove(columnDefinitionListener); + }, + columnDefinitionListener); + super.exitAlterByModifyColumn(ctx); + } + private org.apache.flink.cdc.common.schema.Column toCdcColumn(Column dbzColumn) { return org.apache.flink.cdc.common.schema.Column.physicalColumn( dbzColumn.name(), fromDbzColumn(dbzColumn), dbzColumn.comment());