Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@

import org.apache.flink.cdc.common.event.AddColumnEvent;
import org.apache.flink.cdc.common.event.AlterColumnTypeEvent;
import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.DropColumnEvent;
import org.apache.flink.cdc.common.event.RenameColumnEvent;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.types.DataType;

import io.debezium.connector.mysql.antlr.MySqlAntlrDdlParser;
Expand All @@ -30,6 +32,7 @@
import io.debezium.ddl.parser.mysql.generated.MySqlParserBaseListener;
import io.debezium.relational.Column;
import io.debezium.relational.ColumnEditor;
import io.debezium.relational.TableEditor;
import io.debezium.relational.TableId;
import org.antlr.v4.runtime.tree.ParseTreeListener;
import org.slf4j.Logger;
Expand All @@ -41,6 +44,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static org.apache.flink.cdc.connectors.mysql.utils.MySqlTypeUtils.fromDbzColumn;

Expand All @@ -58,6 +62,7 @@ public class CustomAlterTableParserListener extends MySqlParserBaseListener {
private List<ColumnEditor> columnEditors;

private CustomColumnDefinitionParserListener columnDefinitionListener;
private TableEditor tableEditor;

private int parsingColumnIndex = STARTING_INDEX;

Expand All @@ -70,6 +75,109 @@ public CustomAlterTableParserListener(
this.changes = changes;
}

@Override
public void enterColumnCreateTable(MySqlParser.ColumnCreateTableContext ctx) {
TableId tableId = parser.parseQualifiedTableId(ctx.tableName().fullId());
if (parser.databaseTables().forTable(tableId) == null) {
tableEditor = parser.databaseTables().editOrCreateTable(tableId);
}
super.enterColumnCreateTable(ctx);
}

@Override
public void exitColumnCreateTable(MySqlParser.ColumnCreateTableContext ctx) {
parser.runIfNotNull(
() -> {
// Make sure that the table's character set has been set ...
if (!tableEditor.hasDefaultCharsetName()) {
tableEditor.setDefaultCharsetName(
parser.charsetForTable(tableEditor.tableId()));
}
listeners.remove(columnDefinitionListener);
columnDefinitionListener = null;
// remove column definition parser listener
final String defaultCharsetName = tableEditor.create().defaultCharsetName();
tableEditor.setColumns(
tableEditor.columns().stream()
.map(
column -> {
final ColumnEditor columnEditor = column.edit();
if (columnEditor.charsetNameOfTable() == null) {
columnEditor.charsetNameOfTable(
defaultCharsetName);
}
return columnEditor;
})
.map(ColumnEditor::create)
.collect(Collectors.toList()));
parser.databaseTables().overwriteTable(tableEditor.create());
parser.signalCreateTable(tableEditor.tableId(), ctx);

Schema.Builder builder = Schema.newBuilder();
tableEditor.columns().forEach(column -> builder.column(toCdcColumn(column)));
if (tableEditor.hasPrimaryKey()) {
builder.primaryKey(tableEditor.primaryKeyColumnNames());
}
changes.add(
new CreateTableEvent(
toCdcTableId(tableEditor.tableId()), builder.build()));
},
tableEditor);
super.exitColumnCreateTable(ctx);
}

@Override
public void enterColumnDeclaration(MySqlParser.ColumnDeclarationContext ctx) {
parser.runIfNotNull(
() -> {
String columnName = parser.parseName(ctx.uid());
ColumnEditor columnEditor = Column.editor().name(columnName);
if (columnDefinitionListener == null) {
columnDefinitionListener =
new CustomColumnDefinitionParserListener(
tableEditor, columnEditor, parser, listeners);
listeners.add(columnDefinitionListener);
} else {
columnDefinitionListener.setColumnEditor(columnEditor);
}
},
tableEditor);
super.enterColumnDeclaration(ctx);
}

@Override
public void exitColumnDeclaration(MySqlParser.ColumnDeclarationContext ctx) {
parser.runIfNotNull(
() -> {
tableEditor.addColumn(columnDefinitionListener.getColumn());
},
tableEditor,
columnDefinitionListener);
super.exitColumnDeclaration(ctx);
}

@Override
public void enterPrimaryKeyTableConstraint(MySqlParser.PrimaryKeyTableConstraintContext ctx) {
parser.runIfNotNull(
() -> {
parser.parsePrimaryIndexColumnNames(ctx.indexColumnNames(), tableEditor);
},
tableEditor);
super.enterPrimaryKeyTableConstraint(ctx);
}

@Override
public void enterUniqueKeyTableConstraint(MySqlParser.UniqueKeyTableConstraintContext ctx) {
parser.runIfNotNull(
() -> {
if (!tableEditor.hasPrimaryKey()) {
parser.parsePrimaryIndexColumnNames(ctx.indexColumnNames(), tableEditor);
}
},
tableEditor);
super.enterUniqueKeyTableConstraint(ctx);
}

@Override
public void enterAlterTable(MySqlParser.AlterTableContext ctx) {
this.currentTable = toCdcTableId(parser.parseQualifiedTableId(ctx.tableName().fullId()));
Expand All @@ -88,7 +196,8 @@ public void enterAlterByAddColumn(MySqlParser.AlterByAddColumnContext ctx) {
String columnName = parser.parseName(ctx.uid(0));
ColumnEditor columnEditor = Column.editor().name(columnName);
columnDefinitionListener =
new CustomColumnDefinitionParserListener(columnEditor, parser, listeners);
new CustomColumnDefinitionParserListener(
tableEditor, columnEditor, parser, listeners);
listeners.add(columnDefinitionListener);
super.exitAlterByAddColumn(ctx);
}
Expand Down Expand Up @@ -140,7 +249,8 @@ public void enterAlterByAddColumns(MySqlParser.AlterByAddColumnsContext ctx) {
columnEditors.add(Column.editor().name(columnName));
}
columnDefinitionListener =
new CustomColumnDefinitionParserListener(columnEditors.get(0), parser, listeners);
new CustomColumnDefinitionParserListener(
tableEditor, columnEditors.get(0), parser, listeners);
listeners.add(columnDefinitionListener);
super.enterAlterByAddColumns(ctx);
}
Expand Down Expand Up @@ -190,7 +300,8 @@ public void enterAlterByChangeColumn(MySqlParser.AlterByChangeColumnContext ctx)
columnEditor.unsetDefaultValueExpression();

columnDefinitionListener =
new CustomColumnDefinitionParserListener(columnEditor, parser, listeners);
new CustomColumnDefinitionParserListener(
tableEditor, columnEditor, parser, listeners);
listeners.add(columnDefinitionListener);
super.enterAlterByChangeColumn(ctx);
}
Expand Down Expand Up @@ -229,7 +340,8 @@ public void enterAlterByRenameColumn(MySqlParser.AlterByRenameColumnContext ctx)
String oldColumnName = parser.parseName(ctx.oldColumn);
ColumnEditor columnEditor = Column.editor().name(oldColumnName);
columnDefinitionListener =
new CustomColumnDefinitionParserListener(columnEditor, parser, listeners);
new CustomColumnDefinitionParserListener(
tableEditor, columnEditor, parser, listeners);
listeners.add(columnDefinitionListener);
super.enterAlterByRenameColumn(ctx);
}
Expand All @@ -241,7 +353,8 @@ public void enterAlterByModifyColumn(MySqlParser.AlterByModifyColumnContext ctx)
columnEditor.unsetDefaultValueExpression();

columnDefinitionListener =
new CustomColumnDefinitionParserListener(columnEditor, parser, listeners);
new CustomColumnDefinitionParserListener(
tableEditor, columnEditor, parser, listeners);
listeners.add(columnDefinitionListener);
super.enterAlterByModifyColumn(ctx);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.debezium.ddl.parser.mysql.generated.MySqlParserBaseListener;
import io.debezium.relational.Column;
import io.debezium.relational.ColumnEditor;
import io.debezium.relational.TableEditor;
import io.debezium.relational.ddl.DataType;
import io.debezium.util.Strings;
import org.antlr.v4.runtime.tree.ParseTreeListener;
Expand All @@ -50,13 +51,16 @@ public class CustomColumnDefinitionParserListener extends MySqlParserBaseListene
private boolean uniqueColumn;
private AtomicReference<Boolean> optionalColumn = new AtomicReference<>();
private DefaultValueParserListener defaultValueListener;
private final TableEditor tableEditor;

private final List<ParseTreeListener> listeners;

public CustomColumnDefinitionParserListener(
TableEditor tableEditor,
ColumnEditor columnEditor,
MySqlAntlrDdlParser parser,
List<ParseTreeListener> listeners) {
this.tableEditor = tableEditor;
this.columnEditor = columnEditor;
this.parser = parser;
this.dataTypeResolver = parser.dataTypeResolver();
Expand Down Expand Up @@ -106,6 +110,8 @@ public void enterPrimaryKeyColumnConstraint(MySqlParser.PrimaryKeyColumnConstrai
// this rule will be parsed only if no primary key is set in a table
// otherwise the statement can't be executed due to multiple primary key error
optionalColumn.set(Boolean.FALSE);
tableEditor.addColumn(columnEditor.create());
tableEditor.setPrimaryKeyNames(columnEditor.name());
super.enterPrimaryKeyColumnConstraint(ctx);
}

Expand Down
Loading