From 9bbf138d8c70ebec364c0c95ecb6c47ea7ce9117 Mon Sep 17 00:00:00 2001 From: dujie Date: Wed, 18 May 2022 11:13:49 +0800 Subject: [PATCH] [feat-818] [restore]add restore module --- .../converter/BinlogColumnConverter.java | 23 ++- .../jdbc/converter/JdbcColumnConverter.java | 12 +- .../connector/jdbc/sink/JdbcOutputFormat.java | 50 +++++- .../connector/jdbc/sink/JdbcSinkFactory.java | 1 + .../jdbc/sink/PreparedStmtProxy.java | 15 +- .../main/java/com/dtstack/flinkx/Main.java | 70 ++++++++- .../com/dtstack/flinkx/cdc/DdlRowData.java | 20 +++ .../flinkx/cdc/DdlRowDataConvented.java | 143 ++++++++++++++++++ .../flinkx/cdc/RestorationFlatMap.java | 4 +- .../com/dtstack/flinkx/cdc/WrapCollector.java | 36 +++++ .../flinkx/cdc/ddl/ConventException.java | 26 ++++ .../ddl/ConventExceptionProcessHandler.java | 30 ++++ .../dtstack/flinkx/cdc/ddl/DdlConvent.java | 33 ++++ .../LogConventExceptionProcessHandler.java | 41 +++++ .../flinkx/cdc/ddl/SendProcessHandler.java | 32 ++++ .../flinkx/cdc/ddl/entity/ColumnData.java | 25 +++ .../flinkx/cdc/ddl/entity/ColumnEntity.java | 135 +++++++++++++++++ .../flinkx/cdc/ddl/entity/ColumnType.java | 70 +++++++++ .../cdc/ddl/entity/ColumnTypeConvent.java | 23 +++ .../cdc/ddl/entity/CreateTableData.java | 67 ++++++++ .../flinkx/cdc/ddl/entity/DdlData.java | 39 +++++ .../flinkx/cdc/ddl/entity/Identity.java | 29 ++++ .../dtstack/flinkx/cdc/monitor/Monitor.java | 11 ++ .../flinkx/cdc/monitor/fetch/DdlObserver.java | 44 ++++++ .../flinkx/cdc/monitor/fetch/Event.java | 55 +++++++ .../flinkx/cdc/monitor/fetch/FetcherBase.java | 12 +- .../flinkx/cdc/monitor/store/StoreBase.java | 15 +- .../com/dtstack/flinkx/cdc/worker/Worker.java | 11 +- .../flinkx/cdc/worker/WorkerManager.java | 8 +- .../flinkx/cdc/worker/WorkerOverseer.java | 6 +- .../dtstack/flinkx/conf/FlinkxCommonConf.java | 12 ++ .../flinkx/constants/ConstantValue.java | 2 + .../dtstack/flinkx/enums/OperatorType.java | 2 + .../flinkx/mapping/DdlDataNameMapping.java | 72 +++++++++ .../com/dtstack/flinkx/mapping/Mapping.java | 4 +- .../dtstack/flinkx/mapping/MappingClient.java | 28 +++- .../dtstack/flinkx/mapping/NameMapping.java | 2 +- .../flinkx/mapping/NameMappingConf.java | 11 ++ .../flinkx/mapping/NameMappingFlatMap.java | 62 +++++++- .../flinkx/mapping/PatternMapping.java | 2 +- .../com/dtstack/flinkx/sink/SinkFactory.java | 8 + .../sink/format/BaseRichOutputFormat.java | 75 ++++++++- .../format/BaseRichOutputFormatBuilder.java | 5 + .../flinkx/util/DataSyncFactoryUtil.java | 45 +++++- .../util/DdlConventNameConvertUtil.java | 43 ++++++ .../com/dtstack/flinkx/util/PluginUtil.java | 44 ++++++ .../flinkx/util/event/EventCenter.java | 45 ++++++ flinkx-local-test/pom.xml | 17 ++- .../flinkx/restore/mysql/MysqlFetcher.java | 31 +++- .../restore/mysql/MysqlFetcherConstant.java | 8 +- .../flinkx/restore/mysql/MysqlStore.java | 9 +- flinkx-sql/mysql/pom.xml | 79 ++++++++++ .../convent/mysql/MysqlColumnTypeConvent.java | 29 ++++ .../ddl/convent/mysql/MysqlDdlConvent.java | 41 +++++ .../com.dtstack.flinkx.cdc.ddl.DdlConvent | 16 ++ flinkx-sql/pom.xml | 76 ++++++++++ pom.xml | 3 +- 57 files changed, 1787 insertions(+), 70 deletions(-) create mode 100644 flinkx-core/src/main/java/com/dtstack/flinkx/cdc/DdlRowDataConvented.java create mode 100644 flinkx-core/src/main/java/com/dtstack/flinkx/cdc/WrapCollector.java create mode 100644 flinkx-core/src/main/java/com/dtstack/flinkx/cdc/ddl/ConventException.java create mode 100644 flinkx-core/src/main/java/com/dtstack/flinkx/cdc/ddl/ConventExceptionProcessHandler.java create mode 100644 flinkx-core/src/main/java/com/dtstack/flinkx/cdc/ddl/DdlConvent.java create mode 100644 flinkx-core/src/main/java/com/dtstack/flinkx/cdc/ddl/LogConventExceptionProcessHandler.java create mode 100644 flinkx-core/src/main/java/com/dtstack/flinkx/cdc/ddl/SendProcessHandler.java create mode 100644 flinkx-core/src/main/java/com/dtstack/flinkx/cdc/ddl/entity/ColumnData.java create mode 100644 flinkx-core/src/main/java/com/dtstack/flinkx/cdc/ddl/entity/ColumnEntity.java create mode 100644 flinkx-core/src/main/java/com/dtstack/flinkx/cdc/ddl/entity/ColumnType.java create mode 100644 flinkx-core/src/main/java/com/dtstack/flinkx/cdc/ddl/entity/ColumnTypeConvent.java create mode 100644 flinkx-core/src/main/java/com/dtstack/flinkx/cdc/ddl/entity/CreateTableData.java create mode 100644 flinkx-core/src/main/java/com/dtstack/flinkx/cdc/ddl/entity/DdlData.java create mode 100644 flinkx-core/src/main/java/com/dtstack/flinkx/cdc/ddl/entity/Identity.java create mode 100644 flinkx-core/src/main/java/com/dtstack/flinkx/cdc/monitor/fetch/DdlObserver.java create mode 100644 flinkx-core/src/main/java/com/dtstack/flinkx/cdc/monitor/fetch/Event.java create mode 100644 flinkx-core/src/main/java/com/dtstack/flinkx/mapping/DdlDataNameMapping.java create mode 100644 flinkx-core/src/main/java/com/dtstack/flinkx/util/DdlConventNameConvertUtil.java create mode 100644 flinkx-core/src/main/java/com/dtstack/flinkx/util/event/EventCenter.java create mode 100644 flinkx-sql/mysql/pom.xml create mode 100644 flinkx-sql/mysql/src/main/java/com/dtstack/flinkx/ddl/convent/mysql/MysqlColumnTypeConvent.java create mode 100644 flinkx-sql/mysql/src/main/java/com/dtstack/flinkx/ddl/convent/mysql/MysqlDdlConvent.java create mode 100644 flinkx-sql/mysql/src/main/resources/META-INF/services/com.dtstack.flinkx.cdc.ddl.DdlConvent create mode 100644 flinkx-sql/pom.xml diff --git a/flinkx-connectors/flinkx-connector-binlog/src/main/java/com/dtstack/flinkx/connector/binlog/converter/BinlogColumnConverter.java b/flinkx-connectors/flinkx-connector-binlog/src/main/java/com/dtstack/flinkx/connector/binlog/converter/BinlogColumnConverter.java index 6ed53de91d..b58cf28f0d 100644 --- a/flinkx-connectors/flinkx-connector-binlog/src/main/java/com/dtstack/flinkx/connector/binlog/converter/BinlogColumnConverter.java +++ b/flinkx-connectors/flinkx-connector-binlog/src/main/java/com/dtstack/flinkx/connector/binlog/converter/BinlogColumnConverter.java @@ -20,6 +20,7 @@ import com.dtstack.flinkx.cdc.DdlRowData; import com.dtstack.flinkx.cdc.DdlRowDataBuilder; import com.dtstack.flinkx.connector.binlog.listener.BinlogEventRow; +import com.dtstack.flinkx.constants.CDCConstantValue; import com.dtstack.flinkx.constants.ConstantValue; import com.dtstack.flinkx.converter.AbstractCDCRowConverter; import com.dtstack.flinkx.converter.IDeserializationConverter; @@ -82,9 +83,9 @@ public LinkedList toInternal(BinlogEventRow binlogEventRow) throws Exce String schema = binlogEventRow.getSchema(); String table = binlogEventRow.getTable(); String key = schema + ConstantValue.POINT_SYMBOL + table; - List converters = super.cdcConverterCacheMap.get(key); if (rowChange.getIsDdl()) { + super.cdcConverterCacheMap.remove(key); // 处理 ddl rowChange if (rowChange.getEventType().equals(CanalEntry.EventType.ERASE)) { List parse = @@ -152,6 +153,8 @@ public LinkedList toInternal(BinlogEventRow binlogEventRow) throws Exce } } + List converters = super.cdcConverterCacheMap.get(key); + for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) { if (converters == null) { List list = rowData.getBeforeColumnsList(); @@ -181,12 +184,16 @@ public LinkedList toInternal(BinlogEventRow binlogEventRow) throws Exce ColumnRowData columnRowData = new ColumnRowData(size); columnRowData.addField(new StringColumn(schema)); columnRowData.addHeader(SCHEMA); + columnRowData.addExtHeader(CDCConstantValue.SCHEMA); columnRowData.addField(new StringColumn(table)); columnRowData.addHeader(TABLE); + columnRowData.addExtHeader(CDCConstantValue.TABLE); columnRowData.addField(new BigDecimalColumn(super.idWorker.nextId())); columnRowData.addHeader(TS); + columnRowData.addExtHeader(TS); columnRowData.addField(new TimestampColumn(binlogEventRow.getExecuteTime())); columnRowData.addHeader(OP_TIME); + columnRowData.addExtHeader(CDCConstantValue.OP_TIME); List beforeList = rowData.getBeforeColumnsList(); List afterList = rowData.getAfterColumnsList(); @@ -197,9 +204,16 @@ public LinkedList toInternal(BinlogEventRow binlogEventRow) throws Exce List afterHeaderList = new ArrayList<>(afterList.size()); if (pavingData) { + String prefix_before = BEFORE_; + String prefix_after = AFTER_; + if (split) { + prefix_before = ""; + prefix_after = ""; + } + parseColumnList( + converters, beforeList, beforeColumnList, beforeHeaderList, prefix_before); parseColumnList( - converters, beforeList, beforeColumnList, beforeHeaderList, BEFORE_); - parseColumnList(converters, afterList, afterColumnList, afterHeaderList, AFTER_); + converters, afterList, afterColumnList, afterHeaderList, prefix_after); } else { beforeColumnList.add( new MapColumn(processColumnList(rowData.getBeforeColumnsList()))); @@ -215,6 +229,7 @@ public LinkedList toInternal(BinlogEventRow binlogEventRow) throws Exce copy.setRowKind(RowKind.UPDATE_BEFORE); copy.addField(new StringColumn(RowKind.UPDATE_BEFORE.name())); copy.addHeader(TYPE); + copy.addExtHeader(CDCConstantValue.TYPE); copy.addAllField(beforeColumnList); copy.addAllHeader(beforeHeaderList); result.add(copy); @@ -222,10 +237,12 @@ public LinkedList toInternal(BinlogEventRow binlogEventRow) throws Exce columnRowData.setRowKind(RowKind.UPDATE_AFTER); columnRowData.addField(new StringColumn(RowKind.UPDATE_AFTER.name())); columnRowData.addHeader(TYPE); + columnRowData.addExtHeader(CDCConstantValue.TYPE); } else { columnRowData.setRowKind(getRowKindByType(eventType)); columnRowData.addField(new StringColumn(eventType)); columnRowData.addHeader(TYPE); + columnRowData.addExtHeader(CDCConstantValue.TYPE); columnRowData.addAllField(beforeColumnList); columnRowData.addAllHeader(beforeHeaderList); } diff --git a/flinkx-connectors/flinkx-connector-jdbc-base/src/main/java/com/dtstack/flinkx/connector/jdbc/converter/JdbcColumnConverter.java b/flinkx-connectors/flinkx-connector-jdbc-base/src/main/java/com/dtstack/flinkx/connector/jdbc/converter/JdbcColumnConverter.java index bc1836577a..65b4aef28e 100644 --- a/flinkx-connectors/flinkx-connector-jdbc-base/src/main/java/com/dtstack/flinkx/connector/jdbc/converter/JdbcColumnConverter.java +++ b/flinkx-connectors/flinkx-connector-jdbc-base/src/main/java/com/dtstack/flinkx/connector/jdbc/converter/JdbcColumnConverter.java @@ -182,8 +182,16 @@ protected ISerializationConverter createExternalCon case SMALLINT: case INTEGER: case INTERVAL_YEAR_MONTH: - return (val, index, statement) -> - statement.setInt(index, ((ColumnRowData) val).getField(index).asYearInt()); + return (val, index, statement) -> { + int a = 0; + try { + a = ((ColumnRowData) val).getField(index).asYearInt(); + } catch (Exception e) { + LOG.error("val {}, index{}", val, index, e); + } + + statement.setInt(index, a); + }; case FLOAT: return (val, index, statement) -> statement.setFloat(index, ((ColumnRowData) val).getField(index).asFloat()); diff --git a/flinkx-connectors/flinkx-connector-jdbc-base/src/main/java/com/dtstack/flinkx/connector/jdbc/sink/JdbcOutputFormat.java b/flinkx-connectors/flinkx-connector-jdbc-base/src/main/java/com/dtstack/flinkx/connector/jdbc/sink/JdbcOutputFormat.java index d2c40f29ae..0d9885f521 100644 --- a/flinkx-connectors/flinkx-connector-jdbc-base/src/main/java/com/dtstack/flinkx/connector/jdbc/sink/JdbcOutputFormat.java +++ b/flinkx-connectors/flinkx-connector-jdbc-base/src/main/java/com/dtstack/flinkx/connector/jdbc/sink/JdbcOutputFormat.java @@ -17,6 +17,8 @@ */ package com.dtstack.flinkx.connector.jdbc.sink; +import com.dtstack.flinkx.cdc.DdlRowData; +import com.dtstack.flinkx.cdc.DdlRowDataConvented; import com.dtstack.flinkx.conf.FieldConf; import com.dtstack.flinkx.connector.jdbc.conf.JdbcConf; import com.dtstack.flinkx.connector.jdbc.dialect.JdbcDialect; @@ -66,6 +68,7 @@ public class JdbcOutputFormat extends BaseRichOutputFormat { protected JdbcDialect jdbcDialect; protected transient Connection dbConn; + protected boolean autoCommit = true; protected transient PreparedStmtProxy stmtProxy; @@ -85,7 +88,8 @@ protected void openInternal(int taskNumber, int numTasks) { dbConn = getConnection(); // 默认关闭事务自动提交,手动控制事务 if (Semantic.EXACTLY_ONCE == semantic) { - dbConn.setAutoCommit(false); + autoCommit = false; + dbConn.setAutoCommit(autoCommit); } initColumnList(); if (!EWriteMode.INSERT.name().equalsIgnoreCase(jdbcConf.getMode())) { @@ -259,22 +263,28 @@ public void preCommit() throws Exception { @Override public void commit(long checkpointId) throws Exception { + doCommit(); + } + + @Override + public void rollback(long checkpointId) throws Exception { + dbConn.rollback(); + } + + public void doCommit() throws SQLException { try { - dbConn.commit(); + if (!autoCommit) { + dbConn.commit(); + } snapshotWriteCounter.add(rowsOfCurrentTransaction); rowsOfCurrentTransaction = 0; - stmtProxy.clearBatch(); + stmtProxy.clearStatementCache(); } catch (Exception e) { dbConn.rollback(); throw e; } } - @Override - public void rollback(long checkpointId) throws Exception { - dbConn.rollback(); - } - /** * 执行pre、post SQL * @@ -364,6 +374,30 @@ public void closeInternal() { JdbcUtil.closeDbResources(null, null, dbConn, true); } + @Override + protected void executeDdlRwoData(DdlRowData ddlRowData) throws Exception { + if (ddlRowData instanceof DdlRowDataConvented + && !((DdlRowDataConvented) ddlRowData).conventSuccessful()) { + return; + } + Statement statement = dbConn.createStatement(); + statement.execute(ddlRowData.getSql()); + } + + /** + * write all data and commit transaction before execute ddl sql + * + * @param ddlRowData + * @throws Exception + */ + @Override + protected void preExecuteDdlRwoData(DdlRowData ddlRowData) throws Exception { + while (this.rows.size() > 0) { + this.writeRecordInternal(); + } + doCommit(); + } + /** * 获取数据库连接,用于子类覆盖 * diff --git a/flinkx-connectors/flinkx-connector-jdbc-base/src/main/java/com/dtstack/flinkx/connector/jdbc/sink/JdbcSinkFactory.java b/flinkx-connectors/flinkx-connector-jdbc-base/src/main/java/com/dtstack/flinkx/connector/jdbc/sink/JdbcSinkFactory.java index 4a7e63cafb..4282995bf2 100644 --- a/flinkx-connectors/flinkx-connector-jdbc-base/src/main/java/com/dtstack/flinkx/connector/jdbc/sink/JdbcSinkFactory.java +++ b/flinkx-connectors/flinkx-connector-jdbc-base/src/main/java/com/dtstack/flinkx/connector/jdbc/sink/JdbcSinkFactory.java @@ -96,6 +96,7 @@ public DataStreamSink createSink(DataStream dataSet) { JdbcOutputFormatBuilder builder = getBuilder(); builder.setJdbcConf(jdbcConf); builder.setJdbcDialect(jdbcDialect); + builder.setMonitorConfig(monitor); AbstractRowConverter rowConverter = null; // 同步任务使用transform diff --git a/flinkx-connectors/flinkx-connector-jdbc-base/src/main/java/com/dtstack/flinkx/connector/jdbc/sink/PreparedStmtProxy.java b/flinkx-connectors/flinkx-connector-jdbc-base/src/main/java/com/dtstack/flinkx/connector/jdbc/sink/PreparedStmtProxy.java index 1e824e0dad..68dbffb1e8 100644 --- a/flinkx-connectors/flinkx-connector-jdbc-base/src/main/java/com/dtstack/flinkx/connector/jdbc/sink/PreparedStmtProxy.java +++ b/flinkx-connectors/flinkx-connector-jdbc-base/src/main/java/com/dtstack/flinkx/connector/jdbc/sink/PreparedStmtProxy.java @@ -118,6 +118,14 @@ public PreparedStmtProxy( public void convertToExternal(RowData row) throws Exception { getOrCreateFieldNamedPstmt(row); + if (!writeExtInfo) { + if (row instanceof ColumnRowData) { + ColumnRowData copy = ((ColumnRowData) row).copy(); + copy.removeExtHeaderInfo(); + row = copy; + } + } + currentFieldNamedPstmt = (FieldNamedPreparedStatement) currentRowConverter.toExternal(row, this.currentFieldNamedPstmt); @@ -159,9 +167,6 @@ public void getOrCreateFieldNamedPstmt(RowData row) throws ExecutionException { currentFieldNamedPstmt = fieldNamedPreparedStatement.getFieldNamedPreparedStatement(); currentRowConverter = fieldNamedPreparedStatement.getRowConverter(); - if (!writeExtInfo) { - columnRowData.removeExtHeaderInfo(); - } } else { String key = getPstmtCacheKey(jdbcConf.getSchema(), jdbcConf.getTable(), RowKind.INSERT); @@ -351,4 +356,8 @@ public void setClob(int fieldIndex, Reader reader) throws SQLException { public void close() throws SQLException { currentFieldNamedPstmt.close(); } + + public void clearStatementCache() { + pstmtCache.invalidateAll(); + } } diff --git a/flinkx-core/src/main/java/com/dtstack/flinkx/Main.java b/flinkx-core/src/main/java/com/dtstack/flinkx/Main.java index 15d3cbeac9..3eedea5c4f 100644 --- a/flinkx-core/src/main/java/com/dtstack/flinkx/Main.java +++ b/flinkx-core/src/main/java/com/dtstack/flinkx/Main.java @@ -19,6 +19,8 @@ import com.dtstack.flinkx.cdc.CdcConf; import com.dtstack.flinkx.cdc.RestorationFlatMap; +import com.dtstack.flinkx.cdc.ddl.DdlConvent; +import com.dtstack.flinkx.cdc.ddl.SendProcessHandler; import com.dtstack.flinkx.cdc.monitor.fetch.FetcherBase; import com.dtstack.flinkx.cdc.monitor.store.StoreBase; import com.dtstack.flinkx.conf.OperatorConf; @@ -30,7 +32,6 @@ import com.dtstack.flinkx.enums.EJobType; import com.dtstack.flinkx.environment.EnvFactory; import com.dtstack.flinkx.environment.MyLocalStreamEnvironment; -import com.dtstack.flinkx.mapping.NameMappingConf; import com.dtstack.flinkx.mapping.NameMappingFlatMap; import com.dtstack.flinkx.options.OptionParser; import com.dtstack.flinkx.options.Options; @@ -40,6 +41,7 @@ import com.dtstack.flinkx.throwable.FlinkxRuntimeException; import com.dtstack.flinkx.throwable.JobConfigException; import com.dtstack.flinkx.util.DataSyncFactoryUtil; +import com.dtstack.flinkx.util.DdlConventNameConvertUtil; import com.dtstack.flinkx.util.ExecuteProcessHelper; import com.dtstack.flinkx.util.FactoryHelper; import com.dtstack.flinkx.util.JobUtil; @@ -81,6 +83,11 @@ import java.util.Optional; import java.util.Properties; +import static com.dtstack.flinkx.util.PluginUtil.READER_SUFFIX; +import static com.dtstack.flinkx.util.PluginUtil.SINK_SUFFIX; +import static com.dtstack.flinkx.util.PluginUtil.SOURCE_SUFFIX; +import static com.dtstack.flinkx.util.PluginUtil.WRITER_SUFFIX; + /** * The main class entry * @@ -178,21 +185,17 @@ private static void exeSyncJob( SourceFactory sourceFactory = DataSyncFactoryUtil.discoverSource(config, env); DataStream dataStreamSource = sourceFactory.createSource(); + dataStreamSource = addNameMapping(config, dataStreamSource); if (!config.getCdcConf().isSkipDDL()) { CdcConf cdcConf = config.getCdcConf(); Pair monitorPair = - DataSyncFactoryUtil.discoverMonitor(cdcConf.getMonitor(), config); + DataSyncFactoryUtil.discoverFetchBase(cdcConf.getMonitor(), config); dataStreamSource = dataStreamSource.flatMap( new RestorationFlatMap( monitorPair.getLeft(), monitorPair.getRight(), cdcConf)); } - if (config.getNameMappingConf() != null) { - NameMappingConf mappingConf = config.getNameMappingConf(); - dataStreamSource = dataStreamSource.flatMap(new NameMappingFlatMap(mappingConf)); - } - SpeedConf speed = config.getSpeed(); if (speed.getReaderChannel() > 1) { dataStreamSource = @@ -220,7 +223,7 @@ private static void exeSyncJob( if (speed.getWriterChannel() > 0) { dataStreamSink.setParallelism(speed.getWriterChannel()); } - + env.disableOperatorChaining(); JobExecutionResult result = env.execute(options.getJobName()); if (env instanceof MyLocalStreamEnvironment) { PrintUtil.printResult(result.getAllAccumulatorResults()); @@ -351,4 +354,55 @@ private static void checkTableConf(OperatorConf operatorConf) { throw new JobConfigException(operatorConf.getName(), "table.tableName", "is missing"); } } + + private static DataStream addNameMapping( + SyncConf config, DataStream dataStreamSource) { + boolean executeDdlAble = + (boolean) config.getWriter().getParameter().getOrDefault("executeDdlAble", false); + + if (config.getNameMappingConf() != null || executeDdlAble) { + boolean needSqlConvent = executeDdlAble; + + // 相同类型数据源且没有映射关系 此时不需要ddl转换 + if (needSqlConvent + && config.getWriter().getName().equals(config.getReader().getName()) + && config.getNameMappingConf() == null) { + needSqlConvent = false; + } + + if (config.getNameMappingConf() != null + && config.getNameMappingConf().getSqlConevnt() != null) { + needSqlConvent = config.getNameMappingConf().getSqlConevnt(); + } + + if (needSqlConvent) { + String readerName = config.getReader().getName(); + String writerName = config.getWriter().getName(); + + readerName = + DdlConventNameConvertUtil.convertPackageName( + readerName.replace(READER_SUFFIX, "").replace(SOURCE_SUFFIX, "")); + writerName = + DdlConventNameConvertUtil.convertPackageName( + writerName.replace(WRITER_SUFFIX, "").replace(SINK_SUFFIX, "")); + + DdlConvent sourceConvent = DataSyncFactoryUtil.discoverDdlConvent(readerName); + DdlConvent sinkConvent = DataSyncFactoryUtil.discoverDdlConvent(writerName); + dataStreamSource = + dataStreamSource.flatMap( + new NameMappingFlatMap( + config.getNameMappingConf(), + sourceConvent, + sinkConvent, + new SendProcessHandler())); + } else { + dataStreamSource = + dataStreamSource.flatMap( + new NameMappingFlatMap( + config.getNameMappingConf(), null, null, null)); + } + } + + return dataStreamSource; + } } diff --git a/flinkx-core/src/main/java/com/dtstack/flinkx/cdc/DdlRowData.java b/flinkx-core/src/main/java/com/dtstack/flinkx/cdc/DdlRowData.java index 62e8f3ca1d..f25b2f843c 100644 --- a/flinkx-core/src/main/java/com/dtstack/flinkx/cdc/DdlRowData.java +++ b/flinkx-core/src/main/java/com/dtstack/flinkx/cdc/DdlRowData.java @@ -181,4 +181,24 @@ public String getLsn() { public String[] getHeaders() { return headers; } + + public String[] getInfos() { + return ddlInfos; + } + + public void replaceData(String original, String another) { + int index = getIndex(original); + if (index != -1) { + this.ddlInfos[index] = another; + } + } + + public int getIndex(String header) { + for (int i = 0; i < headers.length; i++) { + if (headers[i].equals(header)) { + return i; + } + } + return -1; + } } diff --git a/flinkx-core/src/main/java/com/dtstack/flinkx/cdc/DdlRowDataConvented.java b/flinkx-core/src/main/java/com/dtstack/flinkx/cdc/DdlRowDataConvented.java new file mode 100644 index 0000000000..abbcb50530 --- /dev/null +++ b/flinkx-core/src/main/java/com/dtstack/flinkx/cdc/DdlRowDataConvented.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.flinkx.cdc; + +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.MapData; +import org.apache.flink.table.data.RawValueData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.types.RowKind; + +public class DdlRowDataConvented extends DdlRowData { + + private final DdlRowData rowData; + private final Throwable e; + + public DdlRowDataConvented(DdlRowData rowData, Throwable e) { + super(rowData.getHeaders()); + String[] infos = rowData.getInfos(); + for (int i = 0; i < infos.length; i++) { + setDdlInfo(i, infos[i]); + } + this.rowData = rowData; + this.e = e; + } + + @Override + public int getArity() { + return rowData.getArity(); + } + + @Override + public RowKind getRowKind() { + return rowData.getRowKind(); + } + + @Override + public void setRowKind(RowKind kind) { + rowData.setRowKind(kind); + } + + @Override + public boolean isNullAt(int pos) { + return rowData.isNullAt(pos); + } + + @Override + public boolean getBoolean(int pos) { + return rowData.getBoolean(pos); + } + + @Override + public byte getByte(int pos) { + return rowData.getByte(pos); + } + + @Override + public short getShort(int pos) { + return rowData.getShort(pos); + } + + @Override + public int getInt(int pos) { + return rowData.getInt(pos); + } + + @Override + public long getLong(int pos) { + return rowData.getLong(pos); + } + + @Override + public float getFloat(int pos) { + return rowData.getFloat(pos); + } + + @Override + public double getDouble(int pos) { + return rowData.getDouble(pos); + } + + @Override + public StringData getString(int pos) { + return rowData.getString(pos); + } + + @Override + public DecimalData getDecimal(int pos, int precision, int scale) { + return rowData.getDecimal(pos, precision, scale); + } + + @Override + public TimestampData getTimestamp(int pos, int precision) { + return rowData.getTimestamp(pos, precision); + } + + @Override + public RawValueData getRawValue(int pos) { + return rowData.getRawValue(pos); + } + + @Override + public byte[] getBinary(int pos) { + return rowData.getBinary(pos); + } + + @Override + public ArrayData getArray(int pos) { + return rowData.getArray(pos); + } + + @Override + public MapData getMap(int pos) { + return rowData.getMap(pos); + } + + @Override + public RowData getRow(int pos, int numFields) { + return rowData.getRow(pos, numFields); + } + + public boolean conventSuccessful() { + return e == null; + } +} diff --git a/flinkx-core/src/main/java/com/dtstack/flinkx/cdc/RestorationFlatMap.java b/flinkx-core/src/main/java/com/dtstack/flinkx/cdc/RestorationFlatMap.java index 6931e4b249..29150c3ef3 100644 --- a/flinkx-core/src/main/java/com/dtstack/flinkx/cdc/RestorationFlatMap.java +++ b/flinkx-core/src/main/java/com/dtstack/flinkx/cdc/RestorationFlatMap.java @@ -60,7 +60,9 @@ public void close() throws Exception { public void flatMap(RowData value, Collector out) throws Exception { put(value); if (workerManager.getCollector() == null) { - workerManager.setCollector(out); + WrapCollector wrapCollector = new WrapCollector<>(out); + workerManager.setCollector(wrapCollector); + monitor.setCollector(wrapCollector); } } diff --git a/flinkx-core/src/main/java/com/dtstack/flinkx/cdc/WrapCollector.java b/flinkx-core/src/main/java/com/dtstack/flinkx/cdc/WrapCollector.java new file mode 100644 index 0000000000..a7c55ae46d --- /dev/null +++ b/flinkx-core/src/main/java/com/dtstack/flinkx/cdc/WrapCollector.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.flinkx.cdc; + +import org.apache.flink.util.Collector; + +public class WrapCollector { + private static final Object LOCK = new Object(); + private final Collector collector; + + public WrapCollector(Collector collector) { + this.collector = collector; + } + + public void collect(T var) { + synchronized (LOCK) { + this.collector.collect(var); + } + } +} diff --git a/flinkx-core/src/main/java/com/dtstack/flinkx/cdc/ddl/ConventException.java b/flinkx-core/src/main/java/com/dtstack/flinkx/cdc/ddl/ConventException.java new file mode 100644 index 0000000000..4bc1cf58a7 --- /dev/null +++ b/flinkx-core/src/main/java/com/dtstack/flinkx/cdc/ddl/ConventException.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.flinkx.cdc.ddl; + +public class ConventException extends Throwable { + + public ConventException(String sql, Throwable cause) { + super("convent sql [ " + sql + " ] error", cause); + } +} diff --git a/flinkx-core/src/main/java/com/dtstack/flinkx/cdc/ddl/ConventExceptionProcessHandler.java b/flinkx-core/src/main/java/com/dtstack/flinkx/cdc/ddl/ConventExceptionProcessHandler.java new file mode 100644 index 0000000000..117c86c9e7 --- /dev/null +++ b/flinkx-core/src/main/java/com/dtstack/flinkx/cdc/ddl/ConventExceptionProcessHandler.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.flinkx.cdc.ddl; + +import com.dtstack.flinkx.cdc.DdlRowData; + +import org.apache.flink.table.data.RowData; +import org.apache.flink.util.Collector; + +import java.io.Serializable; + +public interface ConventExceptionProcessHandler extends Serializable { + void process(DdlRowData rowData, ConventException e, Collector collector); +} diff --git a/flinkx-core/src/main/java/com/dtstack/flinkx/cdc/ddl/DdlConvent.java b/flinkx-core/src/main/java/com/dtstack/flinkx/cdc/ddl/DdlConvent.java new file mode 100644 index 0000000000..9df63b1fec --- /dev/null +++ b/flinkx-core/src/main/java/com/dtstack/flinkx/cdc/ddl/DdlConvent.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.flinkx.cdc.ddl; + +import com.dtstack.flinkx.cdc.DdlRowData; +import com.dtstack.flinkx.cdc.ddl.entity.DdlData; + +import java.io.Serializable; + +public interface DdlConvent extends Serializable { + + DdlData rowConventToDdlData(DdlRowData row) throws ConventException; + + String ddlDataConventToSql(DdlData ddldata) throws ConventException; + + String getDataSourceType(); +} diff --git a/flinkx-core/src/main/java/com/dtstack/flinkx/cdc/ddl/LogConventExceptionProcessHandler.java b/flinkx-core/src/main/java/com/dtstack/flinkx/cdc/ddl/LogConventExceptionProcessHandler.java new file mode 100644 index 0000000000..d521c61d17 --- /dev/null +++ b/flinkx-core/src/main/java/com/dtstack/flinkx/cdc/ddl/LogConventExceptionProcessHandler.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.flinkx.cdc.ddl; + +import com.dtstack.flinkx.cdc.DdlRowData; + +import org.apache.flink.table.data.RowData; +import org.apache.flink.util.Collector; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class LogConventExceptionProcessHandler implements ConventExceptionProcessHandler { + private static final Logger LOG = + LoggerFactory.getLogger(LogConventExceptionProcessHandler.class); + + @Override + public void process(DdlRowData rowData, ConventException e, Collector collector) { + LOG.warn( + "table {}, sql {} convent failed ", + rowData.getTableIdentifier(), + rowData.getSql(), + e); + } +} diff --git a/flinkx-core/src/main/java/com/dtstack/flinkx/cdc/ddl/SendProcessHandler.java b/flinkx-core/src/main/java/com/dtstack/flinkx/cdc/ddl/SendProcessHandler.java new file mode 100644 index 0000000000..96fecea475 --- /dev/null +++ b/flinkx-core/src/main/java/com/dtstack/flinkx/cdc/ddl/SendProcessHandler.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.flinkx.cdc.ddl; + +import com.dtstack.flinkx.cdc.DdlRowData; +import com.dtstack.flinkx.cdc.DdlRowDataConvented; + +import org.apache.flink.table.data.RowData; +import org.apache.flink.util.Collector; + +public class SendProcessHandler implements ConventExceptionProcessHandler { + @Override + public void process(DdlRowData rowData, ConventException e, Collector collector) { + collector.collect(new DdlRowDataConvented(rowData, e)); + } +} diff --git a/flinkx-core/src/main/java/com/dtstack/flinkx/cdc/ddl/entity/ColumnData.java b/flinkx-core/src/main/java/com/dtstack/flinkx/cdc/ddl/entity/ColumnData.java new file mode 100644 index 0000000000..137e9ba4fa --- /dev/null +++ b/flinkx-core/src/main/java/com/dtstack/flinkx/cdc/ddl/entity/ColumnData.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.flinkx.cdc.ddl.entity; + +import java.util.List; + +public interface ColumnData { + List getColumnEntity(); +} diff --git a/flinkx-core/src/main/java/com/dtstack/flinkx/cdc/ddl/entity/ColumnEntity.java b/flinkx-core/src/main/java/com/dtstack/flinkx/cdc/ddl/entity/ColumnEntity.java new file mode 100644 index 0000000000..7a4c41a445 --- /dev/null +++ b/flinkx-core/src/main/java/com/dtstack/flinkx/cdc/ddl/entity/ColumnEntity.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.flinkx.cdc.ddl.entity; + +public class ColumnEntity { + /** 字段名称 */ + private String name; + /** 字段类型 */ + private ColumnType type; + /** 字段是否可以为空 */ + private boolean nullable; + /** 是否是主键 */ + private boolean isPrimary; + /** 字段默认值 */ + private String defaultValue; + /** 字段描述 */ + private String comment; + /** 字段长度 */ + protected Integer length; + + /** 小数点长度 */ + protected Integer digital; + + /** 字段精度 */ + protected Integer precision; + + public ColumnEntity( + String name, + ColumnType type, + boolean nullable, + boolean isPrimary, + String defaultValue, + String comment, + Integer length, + Integer digital, + Integer precision) { + this.name = name; + this.type = type; + this.nullable = nullable; + this.isPrimary = isPrimary; + this.defaultValue = defaultValue; + this.comment = comment; + this.length = length; + this.digital = digital; + this.precision = precision; + } + + public String getName() { + return name; + } + + public ColumnType getType() { + return type; + } + + public boolean isNullable() { + return nullable; + } + + public boolean isPrimary() { + return isPrimary; + } + + public String getDefaultValue() { + return defaultValue; + } + + public String getComment() { + return comment; + } + + public Integer getLength() { + return length; + } + + public Integer getDigital() { + return digital; + } + + public Integer getPrecision() { + return precision; + } + + public void setName(String name) { + this.name = name; + } + + public void setType(ColumnType type) { + this.type = type; + } + + public void setNullable(boolean nullable) { + this.nullable = nullable; + } + + public void setPrimary(boolean primary) { + isPrimary = primary; + } + + public void setDefaultValue(String defaultValue) { + this.defaultValue = defaultValue; + } + + public void setComment(String comment) { + this.comment = comment; + } + + public void setLength(Integer length) { + this.length = length; + } + + public void setDigital(Integer digital) { + this.digital = digital; + } + + public void setPrecision(Integer precision) { + this.precision = precision; + } +} diff --git a/flinkx-core/src/main/java/com/dtstack/flinkx/cdc/ddl/entity/ColumnType.java b/flinkx-core/src/main/java/com/dtstack/flinkx/cdc/ddl/entity/ColumnType.java new file mode 100644 index 0000000000..1625fa34b8 --- /dev/null +++ b/flinkx-core/src/main/java/com/dtstack/flinkx/cdc/ddl/entity/ColumnType.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.flinkx.cdc.ddl.entity; + +public enum ColumnType { + BOOLEAN(), + TINYINT(), + SMALLINT(), + INTEGER(), + BIGINT(), + DECIMAL(), + FLOAT(), + REAL(), + DOUBLE(), + DATE(), + TIME(), + TIME_WITH_LOCAL_TIME_ZONE(), + TIMESTAMP(), + TIMESTAMP_WITH_LOCAL_TIME_ZONE(), + INTERVAL_YEAR(), + INTERVAL_YEAR_MONTH(), + INTERVAL_MONTH(), + INTERVAL_DAY(), + INTERVAL_DAY_HOUR(), + INTERVAL_DAY_MINUTE(), + INTERVAL_DAY_SECOND(), + INTERVAL_HOUR(), + INTERVAL_HOUR_MINUTE(), + INTERVAL_HOUR_SECOND(), + INTERVAL_MINUTE(), + INTERVAL_MINUTE_SECOND(), + INTERVAL_SECOND(), + CHAR(), + VARCHAR(), + BINARY(), + VARBINARY(), + NULL(), + ANY(), + SYMBOL(), + MULTISET(), + ARRAY(), + MAP(), + DISTINCT(), + STRUCTURED(), + ROW(), + OTHER(), + CURSOR(), + COLUMN_LIST(), + DYNAMIC_STAR(), + GEOMETRY(), + SARG(); + + ColumnType() {} +} diff --git a/flinkx-core/src/main/java/com/dtstack/flinkx/cdc/ddl/entity/ColumnTypeConvent.java b/flinkx-core/src/main/java/com/dtstack/flinkx/cdc/ddl/entity/ColumnTypeConvent.java new file mode 100644 index 0000000000..00a6c69839 --- /dev/null +++ b/flinkx-core/src/main/java/com/dtstack/flinkx/cdc/ddl/entity/ColumnTypeConvent.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.flinkx.cdc.ddl.entity; + +public interface ColumnTypeConvent { + ColumnType conventColumnType(ColumnType columnType); +} diff --git a/flinkx-core/src/main/java/com/dtstack/flinkx/cdc/ddl/entity/CreateTableData.java b/flinkx-core/src/main/java/com/dtstack/flinkx/cdc/ddl/entity/CreateTableData.java new file mode 100644 index 0000000000..8435e87eda --- /dev/null +++ b/flinkx-core/src/main/java/com/dtstack/flinkx/cdc/ddl/entity/CreateTableData.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.flinkx.cdc.ddl.entity; + +import com.dtstack.flinkx.cdc.EventType; + +import java.util.List; + +public class CreateTableData extends DdlData implements Identity, ColumnData { + private String schema; + private String table; + + private final List columnList; + + public CreateTableData( + EventType type, + String sql, + List columnList, + String schema, + String table) { + super(type, sql); + this.columnList = columnList; + this.schema = schema; + this.table = table; + } + + @Override + public List getColumnEntity() { + return columnList; + } + + @Override + public String getSchema() { + return schema; + } + + @Override + public String getTable() { + return table; + } + + @Override + public void setSchema(String schema) { + this.schema = schema; + } + + @Override + public void setTable(String table) { + this.table = table; + } +} diff --git a/flinkx-core/src/main/java/com/dtstack/flinkx/cdc/ddl/entity/DdlData.java b/flinkx-core/src/main/java/com/dtstack/flinkx/cdc/ddl/entity/DdlData.java new file mode 100644 index 0000000000..538bfabea6 --- /dev/null +++ b/flinkx-core/src/main/java/com/dtstack/flinkx/cdc/ddl/entity/DdlData.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.flinkx.cdc.ddl.entity; + +import com.dtstack.flinkx.cdc.EventType; + +public class DdlData { + public final EventType type; + public final String sql; + + public DdlData(EventType type, String sql) { + this.type = type; + this.sql = sql; + } + + public EventType getType() { + return type; + } + + public String getSql() { + return sql; + } +} diff --git a/flinkx-core/src/main/java/com/dtstack/flinkx/cdc/ddl/entity/Identity.java b/flinkx-core/src/main/java/com/dtstack/flinkx/cdc/ddl/entity/Identity.java new file mode 100644 index 0000000000..2306d45a44 --- /dev/null +++ b/flinkx-core/src/main/java/com/dtstack/flinkx/cdc/ddl/entity/Identity.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.flinkx.cdc.ddl.entity; + +public interface Identity { + String getSchema(); + + String getTable(); + + void setSchema(String schema); + + void setTable(String table); +} diff --git a/flinkx-core/src/main/java/com/dtstack/flinkx/cdc/monitor/Monitor.java b/flinkx-core/src/main/java/com/dtstack/flinkx/cdc/monitor/Monitor.java index f23c1433a3..d6c3e7c0d2 100644 --- a/flinkx-core/src/main/java/com/dtstack/flinkx/cdc/monitor/Monitor.java +++ b/flinkx-core/src/main/java/com/dtstack/flinkx/cdc/monitor/Monitor.java @@ -1,11 +1,14 @@ package com.dtstack.flinkx.cdc.monitor; import com.dtstack.flinkx.cdc.QueuesChamberlain; +import com.dtstack.flinkx.cdc.WrapCollector; import com.dtstack.flinkx.cdc.exception.LogExceptionHandler; import com.dtstack.flinkx.cdc.monitor.fetch.FetcherBase; import com.dtstack.flinkx.cdc.monitor.store.StoreBase; import com.dtstack.flinkx.cdc.utils.ExecutorUtils; +import org.apache.flink.table.data.RowData; + import java.io.Serializable; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutorService; @@ -71,6 +74,14 @@ private void submitStore() { storeExecutor.execute(store); } + public WrapCollector getCollector() { + return this.store.getCollector(); + } + + public void setCollector(WrapCollector collector) { + this.store.setCollector(collector); + } + public void close() { if (fetcher != null) { diff --git a/flinkx-core/src/main/java/com/dtstack/flinkx/cdc/monitor/fetch/DdlObserver.java b/flinkx-core/src/main/java/com/dtstack/flinkx/cdc/monitor/fetch/DdlObserver.java new file mode 100644 index 0000000000..c9359e95c5 --- /dev/null +++ b/flinkx-core/src/main/java/com/dtstack/flinkx/cdc/monitor/fetch/DdlObserver.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.flinkx.cdc.monitor.fetch; + +import com.dtstack.flinkx.cdc.DdlRowData; +import com.dtstack.flinkx.cdc.DdlRowDataBuilder; + +import com.google.common.eventbus.Subscribe; + +public class DdlObserver { + + private FetcherBase fetcherBase; + + public DdlObserver(FetcherBase fetcherBase) { + this.fetcherBase = fetcherBase; + } + + @Subscribe + public void dealMessage(Event event) { + DdlRowData rowData = + DdlRowDataBuilder.builder() + .setDatabaseName(event.getSchema()) + .setTableName(event.getTable()) + .setLsn(event.getLsn()) + .build(); + fetcherBase.update(rowData, event.getStatus()); + } +} diff --git a/flinkx-core/src/main/java/com/dtstack/flinkx/cdc/monitor/fetch/Event.java b/flinkx-core/src/main/java/com/dtstack/flinkx/cdc/monitor/fetch/Event.java new file mode 100644 index 0000000000..5b69a8f770 --- /dev/null +++ b/flinkx-core/src/main/java/com/dtstack/flinkx/cdc/monitor/fetch/Event.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.flinkx.cdc.monitor.fetch; + +public class Event { + private final String schema; + private final String table; + private final String lsn; + private final String sql; + private final Integer status; + + public Event(String schema, String table, String lsn, String sql, Integer status) { + this.schema = schema; + this.table = table; + this.lsn = lsn; + this.sql = sql; + this.status = status; + } + + public String getSchema() { + return schema; + } + + public String getTable() { + return table; + } + + public String getSql() { + return sql; + } + + public Integer getStatus() { + return status; + } + + public String getLsn() { + return lsn; + } +} diff --git a/flinkx-core/src/main/java/com/dtstack/flinkx/cdc/monitor/fetch/FetcherBase.java b/flinkx-core/src/main/java/com/dtstack/flinkx/cdc/monitor/fetch/FetcherBase.java index 980e3477e4..b2165fba04 100644 --- a/flinkx-core/src/main/java/com/dtstack/flinkx/cdc/monitor/fetch/FetcherBase.java +++ b/flinkx-core/src/main/java/com/dtstack/flinkx/cdc/monitor/fetch/FetcherBase.java @@ -38,7 +38,10 @@ public void open() throws Exception { Map ddlRowDataMap = query(); if (null != ddlRowDataMap) { ddlRowDataMap.forEach( - (tableIdentity, ddlData) -> chamberlain.block(tableIdentity, ddlData)); + (tableIdentity, ddlData) -> { + chamberlain.block(tableIdentity, ddlData); + storedTableIdentifier.add(tableIdentity); + }); } } @@ -76,6 +79,13 @@ public void run() { */ public abstract void delete(RowData data); + /** + * 更新外部数据源对应的ddl data 状态 + * + * @param data 需要更新的ddl data + */ + public abstract void update(RowData data, int status); + /** * 查询外部数据源中未被处理的ddl数据,返回map of tableIdentity, ddl-row-data。 * diff --git a/flinkx-core/src/main/java/com/dtstack/flinkx/cdc/monitor/store/StoreBase.java b/flinkx-core/src/main/java/com/dtstack/flinkx/cdc/monitor/store/StoreBase.java index 5ab5b41d57..6e74e034b3 100644 --- a/flinkx-core/src/main/java/com/dtstack/flinkx/cdc/monitor/store/StoreBase.java +++ b/flinkx-core/src/main/java/com/dtstack/flinkx/cdc/monitor/store/StoreBase.java @@ -1,6 +1,7 @@ package com.dtstack.flinkx.cdc.monitor.store; import com.dtstack.flinkx.cdc.QueuesChamberlain; +import com.dtstack.flinkx.cdc.WrapCollector; import org.apache.flink.table.data.RowData; @@ -21,6 +22,8 @@ public abstract class StoreBase implements Runnable, Serializable { protected CopyOnWriteArrayList storedTableIdentifier; + protected WrapCollector collector; + @Override public void run() { while (!closed.get()) { @@ -32,7 +35,9 @@ public void run() { // 将block的ddl数据下发到外部数据源中 final Deque rowDataDeque = chamberlain.fromBlock(table); RowData data = rowDataDeque.peekFirst(); - if (store(data)) { + if (collector != null && store(data)) { + // ddl数据需要往下游发送 sink自身判断是否执行ddl语句 + collector.collect(data); storedTableIdentifier.add(table); } } @@ -52,6 +57,14 @@ public void close() { closeSubclass(); } + public WrapCollector getCollector() { + return collector; + } + + public void setCollector(WrapCollector collector) { + this.collector = collector; + } + /** * 存储row data. * diff --git a/flinkx-core/src/main/java/com/dtstack/flinkx/cdc/worker/Worker.java b/flinkx-core/src/main/java/com/dtstack/flinkx/cdc/worker/Worker.java index 4efd555779..877be17350 100644 --- a/flinkx-core/src/main/java/com/dtstack/flinkx/cdc/worker/Worker.java +++ b/flinkx-core/src/main/java/com/dtstack/flinkx/cdc/worker/Worker.java @@ -21,10 +21,10 @@ package com.dtstack.flinkx.cdc.worker; import com.dtstack.flinkx.cdc.QueuesChamberlain; +import com.dtstack.flinkx.cdc.WrapCollector; import com.dtstack.flinkx.element.ColumnRowData; import org.apache.flink.table.data.RowData; -import org.apache.flink.util.Collector; import java.util.Arrays; import java.util.Deque; @@ -39,9 +39,8 @@ */ public class Worker implements Callable { - private static final Object LOCK = new Object(); private final QueuesChamberlain queuesChamberlain; - private final Collector collector; + private final WrapCollector collector; /** 任务分片 */ private final Chunk chunk; /** 队列遍历深度,避免某队列长时间占用线程 */ @@ -49,7 +48,7 @@ public class Worker implements Callable { public Worker( QueuesChamberlain queuesChamberlain, - Collector collector, + WrapCollector collector, Chunk chunk, int size) { this.queuesChamberlain = queuesChamberlain; @@ -85,9 +84,7 @@ private void send() { private void dealDmL(Deque queue) { // 队列头节点是dml, 将该dml数据发送到sink RowData rowData = queue.poll(); - synchronized (LOCK) { - collector.collect(rowData); - } + collector.collect(rowData); } @Override diff --git a/flinkx-core/src/main/java/com/dtstack/flinkx/cdc/worker/WorkerManager.java b/flinkx-core/src/main/java/com/dtstack/flinkx/cdc/worker/WorkerManager.java index c3c839f5d7..b903232aec 100644 --- a/flinkx-core/src/main/java/com/dtstack/flinkx/cdc/worker/WorkerManager.java +++ b/flinkx-core/src/main/java/com/dtstack/flinkx/cdc/worker/WorkerManager.java @@ -22,11 +22,11 @@ import com.dtstack.flinkx.cdc.CdcConf; import com.dtstack.flinkx.cdc.QueuesChamberlain; +import com.dtstack.flinkx.cdc.WrapCollector; import com.dtstack.flinkx.cdc.exception.LogExceptionHandler; import com.dtstack.flinkx.cdc.utils.ExecutorUtils; import org.apache.flink.table.data.RowData; -import org.apache.flink.util.Collector; import java.io.Serializable; import java.util.concurrent.ThreadPoolExecutor; @@ -51,7 +51,7 @@ public class WorkerManager implements Serializable { private WorkerOverseer overseer; - private Collector collector; + private WrapCollector collector; /** worker的核心线程数 */ private final int workerNum; @@ -100,11 +100,11 @@ public void close() { } } - public Collector getCollector() { + public WrapCollector getCollector() { return collector; } - public void setCollector(Collector collector) { + public void setCollector(WrapCollector collector) { this.collector = collector; // collector赋值后才能通知Overseer启动worker线程 openOverseer(); diff --git a/flinkx-core/src/main/java/com/dtstack/flinkx/cdc/worker/WorkerOverseer.java b/flinkx-core/src/main/java/com/dtstack/flinkx/cdc/worker/WorkerOverseer.java index aee2602945..11f38db4fd 100644 --- a/flinkx-core/src/main/java/com/dtstack/flinkx/cdc/worker/WorkerOverseer.java +++ b/flinkx-core/src/main/java/com/dtstack/flinkx/cdc/worker/WorkerOverseer.java @@ -21,9 +21,9 @@ package com.dtstack.flinkx.cdc.worker; import com.dtstack.flinkx.cdc.QueuesChamberlain; +import com.dtstack.flinkx.cdc.WrapCollector; import org.apache.flink.table.data.RowData; -import org.apache.flink.util.Collector; import java.io.Serializable; import java.util.HashSet; @@ -48,7 +48,7 @@ public class WorkerOverseer implements Runnable, Serializable { private final QueuesChamberlain chamberlain; - private final Collector collector; + private final WrapCollector collector; /** 记录已经被worker线程获得的chunk */ private final Set chunkSet = new HashSet<>(); @@ -62,7 +62,7 @@ public class WorkerOverseer implements Runnable, Serializable { public WorkerOverseer( ThreadPoolExecutor workerExecutor, QueuesChamberlain chamberlain, - Collector collector, + WrapCollector collector, int workerSize) { this.workerExecutor = workerExecutor; this.chamberlain = chamberlain; diff --git a/flinkx-core/src/main/java/com/dtstack/flinkx/conf/FlinkxCommonConf.java b/flinkx-core/src/main/java/com/dtstack/flinkx/conf/FlinkxCommonConf.java index 663e11c7f2..436d6a6c0b 100644 --- a/flinkx-core/src/main/java/com/dtstack/flinkx/conf/FlinkxCommonConf.java +++ b/flinkx-core/src/main/java/com/dtstack/flinkx/conf/FlinkxCommonConf.java @@ -52,6 +52,8 @@ public class FlinkxCommonConf implements Serializable { private int batchSize = 1; /** Time when the timer is regularly written to the database */ private long flushIntervalMills = 10000L; + /** whether to execute ddlRowdata */ + private boolean executeDdlAble; /** sp path */ private String savePointPath; @@ -206,6 +208,14 @@ public void setSavePointPath(String savePointPath) { this.savePointPath = savePointPath; } + public boolean isExecuteDdlAble() { + return executeDdlAble; + } + + public void setExecuteDdlAble(boolean executeDdlAble) { + this.executeDdlAble = executeDdlAble; + } + @Override public String toString() { return "FlinkxCommonConf{" @@ -230,6 +240,8 @@ public String toString() { + column + ", batchSize=" + batchSize + + ", executeDdlAble=" + + executeDdlAble + ", flushIntervalMills=" + flushIntervalMills + ", metricPluginRoot='" diff --git a/flinkx-core/src/main/java/com/dtstack/flinkx/constants/ConstantValue.java b/flinkx-core/src/main/java/com/dtstack/flinkx/constants/ConstantValue.java index c917efb746..02ebfa7ec6 100644 --- a/flinkx-core/src/main/java/com/dtstack/flinkx/constants/ConstantValue.java +++ b/flinkx-core/src/main/java/com/dtstack/flinkx/constants/ConstantValue.java @@ -75,4 +75,6 @@ public class ConstantValue { public static final String DIRTY_DATA_DIR_NAME = "dirty-data-collector"; public static final String RESTORE_DIR_NAME = "restore-plugins"; + + public static final String DDL_DIR_NAME = "ddl"; } diff --git a/flinkx-core/src/main/java/com/dtstack/flinkx/enums/OperatorType.java b/flinkx-core/src/main/java/com/dtstack/flinkx/enums/OperatorType.java index c532a8f2ea..ed047253d0 100644 --- a/flinkx-core/src/main/java/com/dtstack/flinkx/enums/OperatorType.java +++ b/flinkx-core/src/main/java/com/dtstack/flinkx/enums/OperatorType.java @@ -32,5 +32,7 @@ public enum OperatorType { store, /** cdc fetcher plugins */ fetcher, + /** cdc sql conevnt plugins */ + ddlConevnt, ; } diff --git a/flinkx-core/src/main/java/com/dtstack/flinkx/mapping/DdlDataNameMapping.java b/flinkx-core/src/main/java/com/dtstack/flinkx/mapping/DdlDataNameMapping.java new file mode 100644 index 0000000000..d474619a48 --- /dev/null +++ b/flinkx-core/src/main/java/com/dtstack/flinkx/mapping/DdlDataNameMapping.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.flinkx.mapping; + +import com.dtstack.flinkx.cdc.ddl.entity.ColumnData; +import com.dtstack.flinkx.cdc.ddl.entity.ColumnEntity; +import com.dtstack.flinkx.cdc.ddl.entity.DdlData; +import com.dtstack.flinkx.cdc.ddl.entity.Identity; + +import org.apache.commons.collections.MapUtils; + +import java.io.Serializable; +import java.util.List; +import java.util.Map; + +public class DdlDataNameMapping implements Mapping, Serializable { + + private static final long serialVersionUID = 1L; + + private final NameMappingRule mappingRule; + + public DdlDataNameMapping(NameMappingConf conf) { + this.mappingRule = new NameMappingRule(conf); + } + + @Override + public DdlData map(DdlData value) { + + if (value instanceof Identity) { + String table = ((Identity) value).getTable(); + String schema = ((Identity) value).getSchema(); + + String targetSchema = mappingRule.schemaMapping(schema); + String targetTable = mappingRule.tableMapping(schema, table); + + ((Identity) value).setSchema(targetSchema); + ((Identity) value).setTable(targetTable); + } + + if (value instanceof ColumnData && value instanceof Identity) { + String table = ((Identity) value).getTable(); + String schema = ((Identity) value).getSchema(); + Map mapFields = mappingRule.getMapFields(schema, table); + List columnEntity = ((ColumnData) value).getColumnEntity(); + if (MapUtils.isNotEmpty(mapFields)) { + columnEntity.forEach( + filed -> { + String targetField = + mappingRule.fieldMapping(filed.getName(), mapFields); + filed.setName(targetField); + }); + } + } + return value; + } +} diff --git a/flinkx-core/src/main/java/com/dtstack/flinkx/mapping/Mapping.java b/flinkx-core/src/main/java/com/dtstack/flinkx/mapping/Mapping.java index 976f61ae95..0abdd41376 100644 --- a/flinkx-core/src/main/java/com/dtstack/flinkx/mapping/Mapping.java +++ b/flinkx-core/src/main/java/com/dtstack/flinkx/mapping/Mapping.java @@ -38,7 +38,7 @@ import static com.dtstack.flinkx.constants.CDCConstantValue.TABLE; /** @author shitou */ -public interface Mapping { +public interface Mapping { Set META_HEADER = Stream.of("schema", "table", "type", "opTime", "ts", "scn") @@ -50,7 +50,7 @@ public interface Mapping { * @param value RowData * @return RowData */ - RowData map(RowData value); + T map(T value); /** * 获取RowData中table、schema的index diff --git a/flinkx-core/src/main/java/com/dtstack/flinkx/mapping/MappingClient.java b/flinkx-core/src/main/java/com/dtstack/flinkx/mapping/MappingClient.java index 6e95abb1af..d42f783837 100644 --- a/flinkx-core/src/main/java/com/dtstack/flinkx/mapping/MappingClient.java +++ b/flinkx-core/src/main/java/com/dtstack/flinkx/mapping/MappingClient.java @@ -20,9 +20,13 @@ package com.dtstack.flinkx.mapping; +import com.dtstack.flinkx.cdc.ddl.entity.DdlData; + import org.apache.flink.table.data.RowData; import java.io.Serializable; +import java.util.Collections; +import java.util.List; /** * 根据映射规则获取对应的目标信息 @@ -36,14 +40,30 @@ public class MappingClient implements Serializable { private static final long serialVersionUID = 1L; - private final NameMapping nameMapping; + private final List> mappings; + private final List> ddlMappings; public MappingClient(NameMappingConf conf) { - this.nameMapping = new NameMapping(conf); + if (conf == null) { + this.mappings = Collections.emptyList(); + this.ddlMappings = Collections.emptyList(); + } else { + this.mappings = Collections.singletonList(new NameMapping(conf)); + this.ddlMappings = Collections.singletonList(new DdlDataNameMapping(conf)); + } } public RowData map(RowData value) { - // TODO 根据配置选择匹配方式(名称匹配或正则匹配) - return nameMapping.map(value); + for (Mapping mapping : mappings) { + value = mapping.map(value); + } + return value; + } + + public DdlData map(DdlData value) { + for (Mapping mapping : ddlMappings) { + value = mapping.map(value); + } + return value; } } diff --git a/flinkx-core/src/main/java/com/dtstack/flinkx/mapping/NameMapping.java b/flinkx-core/src/main/java/com/dtstack/flinkx/mapping/NameMapping.java index 40497f5e0a..034b347345 100644 --- a/flinkx-core/src/main/java/com/dtstack/flinkx/mapping/NameMapping.java +++ b/flinkx-core/src/main/java/com/dtstack/flinkx/mapping/NameMapping.java @@ -40,7 +40,7 @@ * @author shitou * @date 2021/12/15 */ -public class NameMapping implements Mapping, Serializable { +public class NameMapping implements Mapping, Serializable { private static final long serialVersionUID = 1L; diff --git a/flinkx-core/src/main/java/com/dtstack/flinkx/mapping/NameMappingConf.java b/flinkx-core/src/main/java/com/dtstack/flinkx/mapping/NameMappingConf.java index 16d91a4fce..dfa704ab93 100644 --- a/flinkx-core/src/main/java/com/dtstack/flinkx/mapping/NameMappingConf.java +++ b/flinkx-core/src/main/java/com/dtstack/flinkx/mapping/NameMappingConf.java @@ -41,6 +41,8 @@ public class NameMappingConf implements Serializable { /** 用户自定义的正则 */ private String pattern; + private Boolean sqlConevnt; + public String getPattern() { return pattern; } @@ -73,6 +75,14 @@ public void setFieldMappings(Map fieldMappings) { this.fieldMappings = fieldMappings; } + public Boolean getSqlConevnt() { + return sqlConevnt; + } + + public void setSqlConevnt(Boolean sqlConevnt) { + this.sqlConevnt = sqlConevnt; + } + @Override public String toString() { return new StringJoiner(", ", NameMappingConf.class.getSimpleName() + "[", "]") @@ -80,6 +90,7 @@ public String toString() { .add("schemaMappings=" + schemaMappings) .add("fieldMappings=" + fieldMappings) .add("pattern='" + pattern + "'") + .add("sqlConevnt='" + sqlConevnt + "'") .toString(); } } diff --git a/flinkx-core/src/main/java/com/dtstack/flinkx/mapping/NameMappingFlatMap.java b/flinkx-core/src/main/java/com/dtstack/flinkx/mapping/NameMappingFlatMap.java index f9917b7ba3..3373bfde48 100644 --- a/flinkx-core/src/main/java/com/dtstack/flinkx/mapping/NameMappingFlatMap.java +++ b/flinkx-core/src/main/java/com/dtstack/flinkx/mapping/NameMappingFlatMap.java @@ -20,6 +20,14 @@ package com.dtstack.flinkx.mapping; +import com.dtstack.flinkx.cdc.DdlRowData; +import com.dtstack.flinkx.cdc.DdlRowDataConvented; +import com.dtstack.flinkx.cdc.ddl.ConventException; +import com.dtstack.flinkx.cdc.ddl.ConventExceptionProcessHandler; +import com.dtstack.flinkx.cdc.ddl.DdlConvent; +import com.dtstack.flinkx.cdc.ddl.entity.DdlData; +import com.dtstack.flinkx.element.ColumnRowData; + import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.table.data.RowData; import org.apache.flink.util.Collector; @@ -33,14 +41,62 @@ public class NameMappingFlatMap extends RichFlatMapFunction { private final MappingClient client; + private final DdlConvent source; + private final DdlConvent sink; + private final ConventExceptionProcessHandler conventExceptionProcessHandler; - public NameMappingFlatMap(NameMappingConf conf) { + public NameMappingFlatMap( + NameMappingConf conf, + DdlConvent source, + DdlConvent sink, + ConventExceptionProcessHandler conventExceptionProcessHandler) { this.client = new MappingClient(conf); + this.source = source; + this.sink = sink; + this.conventExceptionProcessHandler = conventExceptionProcessHandler; + check(source, sink, conventExceptionProcessHandler); } @Override - public void flatMap(RowData value, Collector collector) throws Exception { + public void flatMap(RowData value, Collector collector) { RowData rowData = client.map(value); - collector.collect(rowData); + if (rowData instanceof ColumnRowData) { + collector.collect(rowData); + } else if (rowData instanceof DdlRowData) { + if (source != null) { + try { + DdlData data = source.rowConventToDdlData((DdlRowData) rowData); + data = client.map(data); + String s = sink.ddlDataConventToSql(data); + ((DdlRowData) rowData).replaceData("content", s); + collector.collect(new DdlRowDataConvented((DdlRowData) rowData, null)); + } catch (ConventException e) { + conventExceptionProcessHandler.process((DdlRowData) rowData, e, collector); + } + } else { + // 没有转换 就直接传递下游 + collector.collect(value); + } + } + } + + private void check(DdlConvent source, DdlConvent sink, ConventExceptionProcessHandler handler) { + if (source != null || sink != null || handler != null) { + if (source != null && sink != null && handler != null) { + return; + } + StringBuilder s = new StringBuilder(); + if (source == null) { + s.append("source convent not allow null"); + } + if (sink == null) { + s.append("sink convent not allow null"); + } + + if (handler == null) { + s.append("handler not allow null"); + } + throw new IllegalArgumentException(s.toString()); + } } } diff --git a/flinkx-core/src/main/java/com/dtstack/flinkx/mapping/PatternMapping.java b/flinkx-core/src/main/java/com/dtstack/flinkx/mapping/PatternMapping.java index dc75b38268..ac4439d224 100644 --- a/flinkx-core/src/main/java/com/dtstack/flinkx/mapping/PatternMapping.java +++ b/flinkx-core/src/main/java/com/dtstack/flinkx/mapping/PatternMapping.java @@ -30,7 +30,7 @@ * @author shitou * @date 2021/12/15 */ -public class PatternMapping implements Mapping, Serializable { +public class PatternMapping implements Mapping, Serializable { private static final long serialVersionUID = 1L; diff --git a/flinkx-core/src/main/java/com/dtstack/flinkx/sink/SinkFactory.java b/flinkx-core/src/main/java/com/dtstack/flinkx/sink/SinkFactory.java index e20e33fefa..e932ef75e6 100644 --- a/flinkx-core/src/main/java/com/dtstack/flinkx/sink/SinkFactory.java +++ b/flinkx-core/src/main/java/com/dtstack/flinkx/sink/SinkFactory.java @@ -18,6 +18,8 @@ package com.dtstack.flinkx.sink; +import com.dtstack.flinkx.cdc.CdcConf; +import com.dtstack.flinkx.cdc.monitor.MonitorConf; import com.dtstack.flinkx.conf.FlinkxCommonConf; import com.dtstack.flinkx.conf.SpeedConf; import com.dtstack.flinkx.conf.SyncConf; @@ -43,6 +45,7 @@ public abstract class SinkFactory implements RawTypeConvertible { protected SyncConf syncConf; protected boolean useAbstractBaseColumn = true; + protected MonitorConf monitor; public SinkFactory(SyncConf syncConf) { this.syncConf = syncConf; @@ -51,6 +54,11 @@ public SinkFactory(SyncConf syncConf) { && StringUtils.isNotBlank(syncConf.getTransformer().getTransformSql())) { useAbstractBaseColumn = false; } + + CdcConf cdcConf = syncConf.getCdcConf(); + if (cdcConf != null) { + monitor = cdcConf.getMonitor(); + } } /** diff --git a/flinkx-core/src/main/java/com/dtstack/flinkx/sink/format/BaseRichOutputFormat.java b/flinkx-core/src/main/java/com/dtstack/flinkx/sink/format/BaseRichOutputFormat.java index 3598586f20..d5e2deb7d3 100644 --- a/flinkx-core/src/main/java/com/dtstack/flinkx/sink/format/BaseRichOutputFormat.java +++ b/flinkx-core/src/main/java/com/dtstack/flinkx/sink/format/BaseRichOutputFormat.java @@ -18,6 +18,11 @@ package com.dtstack.flinkx.sink.format; +import com.dtstack.flinkx.cdc.DdlRowData; +import com.dtstack.flinkx.cdc.DdlRowDataConvented; +import com.dtstack.flinkx.cdc.monitor.MonitorConf; +import com.dtstack.flinkx.cdc.monitor.fetch.DdlObserver; +import com.dtstack.flinkx.cdc.monitor.fetch.Event; import com.dtstack.flinkx.conf.FlinkxCommonConf; import com.dtstack.flinkx.constants.Metrics; import com.dtstack.flinkx.converter.AbstractRowConverter; @@ -34,8 +39,10 @@ import com.dtstack.flinkx.throwable.FlinkxRuntimeException; import com.dtstack.flinkx.throwable.NoRestartException; import com.dtstack.flinkx.throwable.WriteRecordException; +import com.dtstack.flinkx.util.DataSyncFactoryUtil; import com.dtstack.flinkx.util.ExceptionUtil; import com.dtstack.flinkx.util.JsonUtil; +import com.dtstack.flinkx.util.event.EventCenter; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.accumulators.LongCounter; @@ -165,6 +172,10 @@ public abstract class BaseRichOutputFormat extends RichOutputFormat /** the manager of dirty data. */ protected DirtyManager dirtyManager; + protected boolean executeDdlAble; + protected EventCenter eventCenter; + protected MonitorConf monitorConf; + private transient volatile Exception timerWriteException; @Override @@ -197,6 +208,13 @@ public void open(int taskNumber, int numTasks) throws IOException { this.checkpointEnabled = context.isCheckpointingEnabled(); this.batchSize = config.getBatchSize(); this.rows = new ArrayList<>(batchSize); + this.executeDdlAble = config.isExecuteDdlAble(); + if (executeDdlAble) { + this.eventCenter = new EventCenter("ddl"); + DdlObserver ddlObserver = + new DdlObserver(DataSyncFactoryUtil.discoverFetchBase(monitorConf)); + eventCenter.register(ddlObserver); + } this.flushIntervalMills = config.getFlushIntervalMills(); this.flushEnable = new AtomicBoolean(true); this.semantic = Semantic.getByName(config.getSemantic()); @@ -243,17 +261,21 @@ public void open(int taskNumber, int numTasks) throws IOException { public synchronized void writeRecord(RowData rowData) { checkTimerWriteException(); int size = 0; - if (batchSize <= 1) { - writeSingleRecord(rowData, numWriteCounter); + if (rowData instanceof DdlRowData) { + executeDdlRowDataTemplate((DdlRowData) rowData); size = 1; } else { - rows.add(rowData); - if (rows.size() >= batchSize) { - writeRecordInternal(); - size = batchSize; + if (batchSize <= 1) { + writeSingleRecord(rowData, numWriteCounter); + size = 1; + } else { + rows.add(rowData); + if (rows.size() >= batchSize) { + writeRecordInternal(); + size = batchSize; + } } } - updateDuration(); bytesWriteCounter.add(rowSizeCalculator.getObjectSize(rowData)); if (checkpointEnabled) { @@ -523,6 +545,19 @@ public synchronized FormatState getFormatState() throws Exception { return formatState; } + private void executeDdlRowDataTemplate(DdlRowData ddlRowData) { + try { + preExecuteDdlRwoData(ddlRowData); + if (executeDdlAble) { + executeDdlRwoData(ddlRowData); + postExecuteDdlRwoData(ddlRowData); + } + } catch (Exception e) { + LOG.error("execute ddl {} error", ddlRowData); + throw new RuntimeException(e); + } + } + /** * pre commit data * @@ -579,6 +614,24 @@ public synchronized void notifyCheckpointComplete(long checkpointId) { } } + protected void preExecuteDdlRwoData(DdlRowData rowData) throws Exception {} + + protected void executeDdlRwoData(DdlRowData ddlRowData) throws Exception { + throw new UnsupportedOperationException("not support execute ddlRowData"); + } + + protected void postExecuteDdlRwoData(DdlRowData ddlRowData) throws Exception { + if (ddlRowData instanceof DdlRowDataConvented + && !((DdlRowDataConvented) ddlRowData).conventSuccessful()) { + return; + } + + String tableIdentifier = ddlRowData.getTableIdentifier(); + String[] split = tableIdentifier.split("\\."); + eventCenter.postMessage( + new Event(split[0], split[1], ddlRowData.getLsn(), ddlRowData.getSql(), 2)); + } + /** * commit data * @@ -645,4 +698,12 @@ public void setRowConverter(AbstractRowConverter rowConverter) { public void setDirtyManager(DirtyManager dirtyManager) { this.dirtyManager = dirtyManager; } + + public void setExecuteDdlAble(boolean executeDdlAble) { + this.executeDdlAble = executeDdlAble; + } + + public void setMonitorConf(MonitorConf monitorConf) { + this.monitorConf = monitorConf; + } } diff --git a/flinkx-core/src/main/java/com/dtstack/flinkx/sink/format/BaseRichOutputFormatBuilder.java b/flinkx-core/src/main/java/com/dtstack/flinkx/sink/format/BaseRichOutputFormatBuilder.java index 0346a1936a..c57770b67e 100644 --- a/flinkx-core/src/main/java/com/dtstack/flinkx/sink/format/BaseRichOutputFormatBuilder.java +++ b/flinkx-core/src/main/java/com/dtstack/flinkx/sink/format/BaseRichOutputFormatBuilder.java @@ -18,6 +18,7 @@ package com.dtstack.flinkx.sink.format; +import com.dtstack.flinkx.cdc.monitor.MonitorConf; import com.dtstack.flinkx.conf.FlinkxCommonConf; import com.dtstack.flinkx.constants.ConstantValue; import com.dtstack.flinkx.converter.AbstractRowConverter; @@ -41,6 +42,10 @@ public void setConfig(FlinkxCommonConf config) { format.setConfig(config); } + public void setMonitorConfig(MonitorConf config) { + format.setMonitorConf(config); + } + public void setInitAccumulatorAndDirty(boolean initAccumulatorAndDirty) { this.format.initAccumulatorAndDirty = initAccumulatorAndDirty; } diff --git a/flinkx-core/src/main/java/com/dtstack/flinkx/util/DataSyncFactoryUtil.java b/flinkx-core/src/main/java/com/dtstack/flinkx/util/DataSyncFactoryUtil.java index 2d3a263ed1..d86c650e5a 100644 --- a/flinkx-core/src/main/java/com/dtstack/flinkx/util/DataSyncFactoryUtil.java +++ b/flinkx-core/src/main/java/com/dtstack/flinkx/util/DataSyncFactoryUtil.java @@ -18,6 +18,7 @@ package com.dtstack.flinkx.util; +import com.dtstack.flinkx.cdc.ddl.DdlConvent; import com.dtstack.flinkx.cdc.monitor.MonitorConf; import com.dtstack.flinkx.cdc.monitor.fetch.FetcherBase; import com.dtstack.flinkx.cdc.monitor.store.StoreBase; @@ -38,9 +39,13 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.commons.lang3.tuple.Pair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import sun.misc.Service; import java.lang.reflect.Constructor; import java.net.URL; +import java.util.Iterator; import java.util.Set; /** @@ -50,6 +55,8 @@ */ public class DataSyncFactoryUtil { + private static final Logger LOG = LoggerFactory.getLogger(DataSyncFactoryUtil.class); + public static SourceFactory discoverSource(SyncConf config, StreamExecutionEnvironment env) { try { String pluginName = config.getJob().getReader().getName(); @@ -123,7 +130,7 @@ public static DirtyDataCollector discoverDirty(DirtyConf conf) { } } - public static Pair discoverMonitor( + public static Pair discoverFetchBase( MonitorConf monitorConf, SyncConf syncConf) { try { String pluginType = monitorConf.getType(); @@ -158,4 +165,40 @@ public static Pair discoverMonitor( throw new NoRestartException("Load restore plugins failed!", e); } } + + public static FetcherBase discoverFetchBase(MonitorConf conf) { + try { + String pluginName = conf.getType(); + String pluginClassName = + PluginUtil.getPluginClassName(pluginName, OperatorType.fetcher); + + ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); + Class clazz = classLoader.loadClass(pluginClassName); + Constructor constructor = clazz.getConstructor(MonitorConf.class); + final FetcherBase fetcherBase = (FetcherBase) constructor.newInstance(conf); + fetcherBase.openSubclass(); + return fetcherBase; + } catch (Exception e) { + throw new NoRestartException("Load dirty plugins failed!", e); + } + } + + public static DdlConvent discoverDdlConvent(String pluginType) { + try { + Iterator providers = Service.providers(DdlConvent.class); + while (providers.hasNext()) { + DdlConvent processor = providers.next(); + if (processor.getDataSourceType().equals(pluginType)) { + return processor; + } else { + LOG.info( + "find ddl plugin and support dataSource is {}", + processor.getDataSourceType()); + } + } + } catch (Exception e) { + throw new NoRestartException("Load ddl convent plugins failed!", e); + } + throw new NoRestartException("not found ddl convent plugin!,plugin type is " + pluginType); + } } diff --git a/flinkx-core/src/main/java/com/dtstack/flinkx/util/DdlConventNameConvertUtil.java b/flinkx-core/src/main/java/com/dtstack/flinkx/util/DdlConventNameConvertUtil.java new file mode 100644 index 0000000000..43919719d3 --- /dev/null +++ b/flinkx-core/src/main/java/com/dtstack/flinkx/util/DdlConventNameConvertUtil.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.flinkx.util; + +import org.apache.flink.api.java.tuple.Tuple2; + +import java.util.HashMap; +import java.util.Map; + +public class DdlConventNameConvertUtil { + + // tuple f0 package name && directory name,f1 class name + private static Map> connectorNameMap = new HashMap<>(); + + static { + connectorNameMap.put("binlog", new Tuple2<>("mysql", "mysql")); + connectorNameMap.put("sqlserverecdc", new Tuple2<>("sqlservere", "sqlservere")); + connectorNameMap.put("oraclelogminer", new Tuple2<>("oracle", "oracle")); + } + + public static String convertPackageName(String originName) { + if (!connectorNameMap.containsKey(originName)) { + return originName; + } + return connectorNameMap.get(originName).f0; + } +} diff --git a/flinkx-core/src/main/java/com/dtstack/flinkx/util/PluginUtil.java b/flinkx-core/src/main/java/com/dtstack/flinkx/util/PluginUtil.java index a1866fa563..f0c9d8b68c 100644 --- a/flinkx-core/src/main/java/com/dtstack/flinkx/util/PluginUtil.java +++ b/flinkx-core/src/main/java/com/dtstack/flinkx/util/PluginUtil.java @@ -55,6 +55,7 @@ import java.util.concurrent.Future; import static com.dtstack.flinkx.constants.ConstantValue.CONNECTOR_DIR_NAME; +import static com.dtstack.flinkx.constants.ConstantValue.DDL_DIR_NAME; import static com.dtstack.flinkx.constants.ConstantValue.DIRTY_DATA_DIR_NAME; import static com.dtstack.flinkx.constants.ConstantValue.POINT_SYMBOL; import static com.dtstack.flinkx.constants.ConstantValue.RESTORE_DIR_NAME; @@ -117,6 +118,31 @@ public static Set getJarFileDirPath( return urlSet; } + /** + * 根据插件名称查找ddl解析插件路径 + * + * @param pluginName 插件名称,如: kafkareader、kafkasource等 + * @param pluginRoot + * @param remotePluginPath + * @param suffix + * @return + */ + public static Set getDdlJarFileDirPath( + String pluginName, String pluginRoot, String remotePluginPath, String suffix) { + Set urlSet = new HashSet<>(); + String name = + pluginName + .replace(READER_SUFFIX, "") + .replace(SOURCE_SUFFIX, "") + .replace(WRITER_SUFFIX, "") + .replace(SINK_SUFFIX, ""); + name = DdlConventNameConvertUtil.convertPackageName(name); + getJarUrlList(pluginRoot, suffix, name, urlSet); + getJarUrlList(remotePluginPath, suffix, name, urlSet); + + return urlSet; + } + /** * Obtain local and remote FlinkX plugin jar package path * @@ -316,6 +342,24 @@ public static void registerPluginUrlToCachedFile( config.getRemotePluginPath(), RESTORE_DIR_NAME); urlSet.addAll(restoreUrlSet); + + Set ddlSourceUrlSet = + getDdlJarFileDirPath( + config.getReader().getName(), + config.getPluginRoot(), + config.getRemotePluginPath(), + DDL_DIR_NAME); + + urlSet.addAll(ddlSourceUrlSet); + + Set ddlSinkUrlSet = + getDdlJarFileDirPath( + config.getWriter().getName(), + config.getPluginRoot(), + config.getRemotePluginPath(), + DDL_DIR_NAME); + + urlSet.addAll(ddlSinkUrlSet); } urlSet.addAll(coreUrlSet); diff --git a/flinkx-core/src/main/java/com/dtstack/flinkx/util/event/EventCenter.java b/flinkx-core/src/main/java/com/dtstack/flinkx/util/event/EventCenter.java new file mode 100644 index 0000000000..f645928607 --- /dev/null +++ b/flinkx-core/src/main/java/com/dtstack/flinkx/util/event/EventCenter.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.flinkx.util.event; + +import com.google.common.eventbus.EventBus; + +public class EventCenter { + private final EventBus eventBus; + + public EventCenter(String identify) { + this.eventBus = new EventBus(identify); + } + + public EventBus getEventBus() { + return eventBus; + } + + public void register(Object object) { + eventBus.register(object); + } + + public void unregister(Object object) { + eventBus.unregister(object); + } + + public void postMessage(Object object) { + eventBus.post(object); + } +} diff --git a/flinkx-local-test/pom.xml b/flinkx-local-test/pom.xml index 6f0fe0f8c9..66b5ec1678 100644 --- a/flinkx-local-test/pom.xml +++ b/flinkx-local-test/pom.xml @@ -285,11 +285,11 @@ flinkx-connector-doris ${project.version} - - com.dtstack.flinkx - flinkx-connector-starrocks - ${project.version} - + + + + + @@ -332,5 +332,12 @@ + + + + com.dtstack.flinkx + flinkx-ddl-mysql + ${project.version} + diff --git a/flinkx-restore/flinkx-restore-mysql/src/main/java/com/dtstack/flinkx/restore/mysql/MysqlFetcher.java b/flinkx-restore/flinkx-restore-mysql/src/main/java/com/dtstack/flinkx/restore/mysql/MysqlFetcher.java index 3d1a6e940a..cdd3d409f4 100644 --- a/flinkx-restore/flinkx-restore-mysql/src/main/java/com/dtstack/flinkx/restore/mysql/MysqlFetcher.java +++ b/flinkx-restore/flinkx-restore-mysql/src/main/java/com/dtstack/flinkx/restore/mysql/MysqlFetcher.java @@ -2,6 +2,7 @@ import com.dtstack.flinkx.cdc.DdlRowData; import com.dtstack.flinkx.cdc.DdlRowDataBuilder; +import com.dtstack.flinkx.cdc.DdlRowDataConvented; import com.dtstack.flinkx.cdc.monitor.MonitorConf; import com.dtstack.flinkx.cdc.monitor.fetch.FetcherBase; import com.dtstack.flinkx.restore.mysql.utils.DataSourceUtil; @@ -29,6 +30,7 @@ import static com.dtstack.flinkx.restore.mysql.MysqlFetcherConstant.SELECT; import static com.dtstack.flinkx.restore.mysql.MysqlFetcherConstant.SELECT_CHECK; import static com.dtstack.flinkx.restore.mysql.MysqlFetcherConstant.TABLE_KEY; +import static com.dtstack.flinkx.restore.mysql.MysqlFetcherConstant.UPDATE; /** * @author tiezhu@dtstack.com @@ -50,6 +52,8 @@ public class MysqlFetcher extends FetcherBase { private PreparedStatement query; + private PreparedStatement update; + public MysqlFetcher(MonitorConf conf) { this.conf = conf; } @@ -98,6 +102,25 @@ public void delete(RowData data) { } } + @Override + public void update(RowData data, int status) { + if (data instanceof DdlRowData) { + DdlRowData ddlRowData = (DdlRowData) data; + String tableIdentifier = ddlRowData.getTableIdentifier(); + String[] split = tableIdentifier.split("\\."); + try { + update.setInt(1, status); + update.setString(2, split[0].replace("'", "")); + update.setString(3, split[1].replace("'", "")); + update.setString(4, ddlRowData.getLsn()); + update.execute(); + } catch (SQLException e) { + throw new RuntimeException( + "Delete ddl failed! tableIdentifier: " + tableIdentifier, e); + } + } + } + @Override public Map query() { final Map ddlRowDataMap = new HashMap<>(); @@ -108,7 +131,7 @@ public Map query() { String operationType = resultSet.getString(3); String lsn = resultSet.getString(4); String content = resultSet.getString(5); - + int status = resultSet.getInt(6); DdlRowData ddl = DdlRowDataBuilder.builder() .setDatabaseName(databaseName) @@ -118,6 +141,10 @@ public Map query() { .setContent(content) .build(); + if (status != 0) { + ddl = new DdlRowDataConvented(ddl, new RuntimeException()); + } + String tableIdentity = "'" + databaseName + "'.'" + tableName + "'"; ddlRowDataMap.put(tableIdentity, ddl); } @@ -146,9 +173,11 @@ public void openSubclass() throws Exception { String select = SELECT.replace("$database", database).replace("$table", table); String delete = DELETE.replace("$database", database).replace("$table", table); String query = QUERY.replace("$database", database).replace("$table", table); + String update = UPDATE.replace("$database", database).replace("$table", table); this.select = connection.prepareStatement(select); this.delete = connection.prepareStatement(delete); this.query = connection.prepareStatement(query); + this.update = connection.prepareStatement(update); } /** diff --git a/flinkx-restore/flinkx-restore-mysql/src/main/java/com/dtstack/flinkx/restore/mysql/MysqlFetcherConstant.java b/flinkx-restore/flinkx-restore-mysql/src/main/java/com/dtstack/flinkx/restore/mysql/MysqlFetcherConstant.java index 1ffa729f16..dcd466a06e 100644 --- a/flinkx-restore/flinkx-restore-mysql/src/main/java/com/dtstack/flinkx/restore/mysql/MysqlFetcherConstant.java +++ b/flinkx-restore/flinkx-restore-mysql/src/main/java/com/dtstack/flinkx/restore/mysql/MysqlFetcherConstant.java @@ -35,13 +35,17 @@ private MysqlFetcherConstant() {} + " where status = 2 and database_name = ? and table_name = ? and lsn <= ?"; public static final String QUERY = - "select database_name, table_name, operation_type, lsn, content, update_time from `$database`.`$table`" + "select database_name, table_name, operation_type, lsn, content, update_time, status from `$database`.`$table`" + " where status != 2"; + public static final String UPDATE = + "update `$database`.`$table` set status = ?" + + " where database_name = ? and table_name = ? and lsn = ? and status = 0"; + public static final String INSERT = "INSERT INTO `$database`.`$table` " + "(database_name, table_name, operation_type, lsn, content, update_time, status)" - + " VALUE ($database_name, $table_name, '$operation_type', '$lsn', '$content', $update_time, 0)"; + + " VALUE ($database_name, $table_name, '$operation_type', '$lsn', '$content', $update_time, $status)"; public static final String SELECT_CHECK = "SELECT * FROM `$database`.`$table` WHERE 1 = 2"; diff --git a/flinkx-restore/flinkx-restore-mysql/src/main/java/com/dtstack/flinkx/restore/mysql/MysqlStore.java b/flinkx-restore/flinkx-restore-mysql/src/main/java/com/dtstack/flinkx/restore/mysql/MysqlStore.java index 0965f82488..ee6480f707 100644 --- a/flinkx-restore/flinkx-restore-mysql/src/main/java/com/dtstack/flinkx/restore/mysql/MysqlStore.java +++ b/flinkx-restore/flinkx-restore-mysql/src/main/java/com/dtstack/flinkx/restore/mysql/MysqlStore.java @@ -1,6 +1,7 @@ package com.dtstack.flinkx.restore.mysql; import com.dtstack.flinkx.cdc.DdlRowData; +import com.dtstack.flinkx.cdc.DdlRowDataConvented; import com.dtstack.flinkx.cdc.monitor.MonitorConf; import com.dtstack.flinkx.cdc.monitor.store.StoreBase; import com.dtstack.flinkx.restore.mysql.utils.DataSourceUtil; @@ -62,6 +63,11 @@ public boolean store(RowData data) { String operationType = ddlRowData.getType().getValue(); String lsn = ddlRowData.getLsn(); String sql = ddlRowData.getSql(); + String status = "0"; + if (data instanceof DdlRowDataConvented + && !((DdlRowDataConvented) data).conventSuccessful()) { + status = "-1"; + } String insert = INSERT.replace("$database_name", databaseName) .replace("$table_name", tableName) @@ -70,7 +76,8 @@ public boolean store(RowData data) { .replace("$content", sql) .replace("$update_time", "CURRENT_TIMESTAMP") .replace("$database", ddlDatabase) - .replace("$table", ddlTable); + .replace("$table", ddlTable) + .replace("$status", status); try { preparedStatement.execute(insert); diff --git a/flinkx-sql/mysql/pom.xml b/flinkx-sql/mysql/pom.xml new file mode 100644 index 0000000000..76a922447a --- /dev/null +++ b/flinkx-sql/mysql/pom.xml @@ -0,0 +1,79 @@ + + + + flinkx-sql + com.dtstack.flinkx + 1.12-SNAPSHOT + + 4.0.0 + + flinkx-ddl-mysql + + + 8 + 8 + mysql + + + + + + org.apache.maven.plugins + maven-shade-plugin + 3.1.0 + + + package + + shade + + + false + + + *:* + + support/** + tpcds/** + tpch/** + ddl/** + google/** + + + + + + + + + + maven-antrun-plugin + + + copy-resources + + package + + run + + + + + + + + + + + + + + + + diff --git a/flinkx-sql/mysql/src/main/java/com/dtstack/flinkx/ddl/convent/mysql/MysqlColumnTypeConvent.java b/flinkx-sql/mysql/src/main/java/com/dtstack/flinkx/ddl/convent/mysql/MysqlColumnTypeConvent.java new file mode 100644 index 0000000000..69501bffb4 --- /dev/null +++ b/flinkx-sql/mysql/src/main/java/com/dtstack/flinkx/ddl/convent/mysql/MysqlColumnTypeConvent.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.flinkx.ddl.convent.mysql; + +import com.dtstack.flinkx.cdc.ddl.entity.ColumnType; +import com.dtstack.flinkx.cdc.ddl.entity.ColumnTypeConvent; + +public class MysqlColumnTypeConvent implements ColumnTypeConvent { + @Override + public ColumnType conventColumnType(ColumnType columnType) { + return null; + } +} diff --git a/flinkx-sql/mysql/src/main/java/com/dtstack/flinkx/ddl/convent/mysql/MysqlDdlConvent.java b/flinkx-sql/mysql/src/main/java/com/dtstack/flinkx/ddl/convent/mysql/MysqlDdlConvent.java new file mode 100644 index 0000000000..447ddae16a --- /dev/null +++ b/flinkx-sql/mysql/src/main/java/com/dtstack/flinkx/ddl/convent/mysql/MysqlDdlConvent.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.flinkx.ddl.convent.mysql; + +import com.dtstack.flinkx.cdc.DdlRowData; +import com.dtstack.flinkx.cdc.ddl.ConventException; +import com.dtstack.flinkx.cdc.ddl.DdlConvent; +import com.dtstack.flinkx.cdc.ddl.entity.DdlData; + +public class MysqlDdlConvent implements DdlConvent { + @Override + public DdlData rowConventToDdlData(DdlRowData row) throws ConventException { + throw new ConventException(row.getSql(), new RuntimeException()); + } + + @Override + public String ddlDataConventToSql(DdlData ddldata) throws ConventException { + throw new ConventException(ddldata.getSql(), new RuntimeException()); + } + + @Override + public String getDataSourceType() { + return "mysql"; + } +} diff --git a/flinkx-sql/mysql/src/main/resources/META-INF/services/com.dtstack.flinkx.cdc.ddl.DdlConvent b/flinkx-sql/mysql/src/main/resources/META-INF/services/com.dtstack.flinkx.cdc.ddl.DdlConvent new file mode 100644 index 0000000000..2feb361d34 --- /dev/null +++ b/flinkx-sql/mysql/src/main/resources/META-INF/services/com.dtstack.flinkx.cdc.ddl.DdlConvent @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +com.dtstack.flinkx.ddl.convent.mysql.MysqlDdlConvent diff --git a/flinkx-sql/pom.xml b/flinkx-sql/pom.xml new file mode 100644 index 0000000000..0d27f54e8f --- /dev/null +++ b/flinkx-sql/pom.xml @@ -0,0 +1,76 @@ + + + + flinkx-parent + com.dtstack.flinkx + 1.12-SNAPSHOT + + 4.0.0 + + flinkx-sql + FlinkX : SQL + + mysql + + pom + + + 8 + 8 + sql + + + + + com.dtstack.flinkx + flinkx-core + ${project.version} + provided + + + ch.qos.logback + logback-classic + + + ch.qos.logback + logback-core + + + + + + + + + maven-antrun-plugin + + + copy-resources + + package + + run + + + + + + + + + + + + + + + + + + diff --git a/pom.xml b/pom.xml index e54fde64f0..cff034d8e3 100644 --- a/pom.xml +++ b/pom.xml @@ -20,7 +20,8 @@ flinkx-metrics flinkx-dirtydata-collectors flinkx-restore - + flinkx-sql + UTF-8