Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.dtstack.flinkx.cdc.DdlRowData;
import com.dtstack.flinkx.cdc.DdlRowDataBuilder;
import com.dtstack.flinkx.connector.binlog.listener.BinlogEventRow;
import com.dtstack.flinkx.constants.CDCConstantValue;
import com.dtstack.flinkx.constants.ConstantValue;
import com.dtstack.flinkx.converter.AbstractCDCRowConverter;
import com.dtstack.flinkx.converter.IDeserializationConverter;
Expand Down Expand Up @@ -82,9 +83,9 @@ public LinkedList<RowData> toInternal(BinlogEventRow binlogEventRow) throws Exce
String schema = binlogEventRow.getSchema();
String table = binlogEventRow.getTable();
String key = schema + ConstantValue.POINT_SYMBOL + table;
List<IDeserializationConverter> converters = super.cdcConverterCacheMap.get(key);

if (rowChange.getIsDdl()) {
super.cdcConverterCacheMap.remove(key);
// 处理 ddl rowChange
if (rowChange.getEventType().equals(CanalEntry.EventType.ERASE)) {
List<DdlResult> parse =
Expand Down Expand Up @@ -152,6 +153,8 @@ public LinkedList<RowData> toInternal(BinlogEventRow binlogEventRow) throws Exce
}
}

List<IDeserializationConverter> converters = super.cdcConverterCacheMap.get(key);

for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
if (converters == null) {
List<CanalEntry.Column> list = rowData.getBeforeColumnsList();
Expand Down Expand Up @@ -181,12 +184,16 @@ public LinkedList<RowData> toInternal(BinlogEventRow binlogEventRow) throws Exce
ColumnRowData columnRowData = new ColumnRowData(size);
columnRowData.addField(new StringColumn(schema));
columnRowData.addHeader(SCHEMA);
columnRowData.addExtHeader(CDCConstantValue.SCHEMA);
columnRowData.addField(new StringColumn(table));
columnRowData.addHeader(TABLE);
columnRowData.addExtHeader(CDCConstantValue.TABLE);
columnRowData.addField(new BigDecimalColumn(super.idWorker.nextId()));
columnRowData.addHeader(TS);
columnRowData.addExtHeader(TS);
columnRowData.addField(new TimestampColumn(binlogEventRow.getExecuteTime()));
columnRowData.addHeader(OP_TIME);
columnRowData.addExtHeader(CDCConstantValue.OP_TIME);

List<CanalEntry.Column> beforeList = rowData.getBeforeColumnsList();
List<CanalEntry.Column> afterList = rowData.getAfterColumnsList();
Expand All @@ -197,9 +204,16 @@ public LinkedList<RowData> toInternal(BinlogEventRow binlogEventRow) throws Exce
List<String> afterHeaderList = new ArrayList<>(afterList.size());

if (pavingData) {
String prefix_before = BEFORE_;
String prefix_after = AFTER_;
if (split) {
prefix_before = "";
prefix_after = "";
}
parseColumnList(
converters, beforeList, beforeColumnList, beforeHeaderList, prefix_before);
parseColumnList(
converters, beforeList, beforeColumnList, beforeHeaderList, BEFORE_);
parseColumnList(converters, afterList, afterColumnList, afterHeaderList, AFTER_);
converters, afterList, afterColumnList, afterHeaderList, prefix_after);
} else {
beforeColumnList.add(
new MapColumn(processColumnList(rowData.getBeforeColumnsList())));
Expand All @@ -215,17 +229,20 @@ public LinkedList<RowData> toInternal(BinlogEventRow binlogEventRow) throws Exce
copy.setRowKind(RowKind.UPDATE_BEFORE);
copy.addField(new StringColumn(RowKind.UPDATE_BEFORE.name()));
copy.addHeader(TYPE);
copy.addExtHeader(CDCConstantValue.TYPE);
copy.addAllField(beforeColumnList);
copy.addAllHeader(beforeHeaderList);
result.add(copy);

columnRowData.setRowKind(RowKind.UPDATE_AFTER);
columnRowData.addField(new StringColumn(RowKind.UPDATE_AFTER.name()));
columnRowData.addHeader(TYPE);
columnRowData.addExtHeader(CDCConstantValue.TYPE);
} else {
columnRowData.setRowKind(getRowKindByType(eventType));
columnRowData.addField(new StringColumn(eventType));
columnRowData.addHeader(TYPE);
columnRowData.addExtHeader(CDCConstantValue.TYPE);
columnRowData.addAllField(beforeColumnList);
columnRowData.addAllHeader(beforeHeaderList);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,16 @@ protected ISerializationConverter<FieldNamedPreparedStatement> createExternalCon
case SMALLINT:
case INTEGER:
case INTERVAL_YEAR_MONTH:
return (val, index, statement) ->
statement.setInt(index, ((ColumnRowData) val).getField(index).asYearInt());
return (val, index, statement) -> {
int a = 0;
try {
a = ((ColumnRowData) val).getField(index).asYearInt();
} catch (Exception e) {
LOG.error("val {}, index{}", val, index, e);
}

statement.setInt(index, a);
};
case FLOAT:
return (val, index, statement) ->
statement.setFloat(index, ((ColumnRowData) val).getField(index).asFloat());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package com.dtstack.flinkx.connector.jdbc.sink;

import com.dtstack.flinkx.cdc.DdlRowData;
import com.dtstack.flinkx.cdc.DdlRowDataConvented;
import com.dtstack.flinkx.conf.FieldConf;
import com.dtstack.flinkx.connector.jdbc.conf.JdbcConf;
import com.dtstack.flinkx.connector.jdbc.dialect.JdbcDialect;
Expand Down Expand Up @@ -66,6 +68,7 @@ public class JdbcOutputFormat extends BaseRichOutputFormat {
protected JdbcDialect jdbcDialect;

protected transient Connection dbConn;
protected boolean autoCommit = true;

protected transient PreparedStmtProxy stmtProxy;

Expand All @@ -85,7 +88,8 @@ protected void openInternal(int taskNumber, int numTasks) {
dbConn = getConnection();
// 默认关闭事务自动提交,手动控制事务
if (Semantic.EXACTLY_ONCE == semantic) {
dbConn.setAutoCommit(false);
autoCommit = false;
dbConn.setAutoCommit(autoCommit);
}
initColumnList();
if (!EWriteMode.INSERT.name().equalsIgnoreCase(jdbcConf.getMode())) {
Expand Down Expand Up @@ -259,22 +263,28 @@ public void preCommit() throws Exception {

@Override
public void commit(long checkpointId) throws Exception {
doCommit();
}

@Override
public void rollback(long checkpointId) throws Exception {
dbConn.rollback();
}

public void doCommit() throws SQLException {
try {
dbConn.commit();
if (!autoCommit) {
dbConn.commit();
}
snapshotWriteCounter.add(rowsOfCurrentTransaction);
rowsOfCurrentTransaction = 0;
stmtProxy.clearBatch();
stmtProxy.clearStatementCache();
} catch (Exception e) {
dbConn.rollback();
throw e;
}
}

@Override
public void rollback(long checkpointId) throws Exception {
dbConn.rollback();
}

/**
* 执行pre、post SQL
*
Expand Down Expand Up @@ -364,6 +374,30 @@ public void closeInternal() {
JdbcUtil.closeDbResources(null, null, dbConn, true);
}

@Override
protected void executeDdlRwoData(DdlRowData ddlRowData) throws Exception {
if (ddlRowData instanceof DdlRowDataConvented
&& !((DdlRowDataConvented) ddlRowData).conventSuccessful()) {
return;
}
Statement statement = dbConn.createStatement();
statement.execute(ddlRowData.getSql());
}

/**
* write all data and commit transaction before execute ddl sql
*
* @param ddlRowData
* @throws Exception
*/
@Override
protected void preExecuteDdlRwoData(DdlRowData ddlRowData) throws Exception {
while (this.rows.size() > 0) {
this.writeRecordInternal();
}
doCommit();
}

/**
* 获取数据库连接,用于子类覆盖
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ public DataStreamSink<RowData> createSink(DataStream<RowData> dataSet) {
JdbcOutputFormatBuilder builder = getBuilder();
builder.setJdbcConf(jdbcConf);
builder.setJdbcDialect(jdbcDialect);
builder.setMonitorConfig(monitor);

AbstractRowConverter rowConverter = null;
// 同步任务使用transform
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,14 @@ public PreparedStmtProxy(

public void convertToExternal(RowData row) throws Exception {
getOrCreateFieldNamedPstmt(row);
if (!writeExtInfo) {
if (row instanceof ColumnRowData) {
ColumnRowData copy = ((ColumnRowData) row).copy();
copy.removeExtHeaderInfo();
row = copy;
}
}

currentFieldNamedPstmt =
(FieldNamedPreparedStatement)
currentRowConverter.toExternal(row, this.currentFieldNamedPstmt);
Expand Down Expand Up @@ -159,9 +167,6 @@ public void getOrCreateFieldNamedPstmt(RowData row) throws ExecutionException {

currentFieldNamedPstmt = fieldNamedPreparedStatement.getFieldNamedPreparedStatement();
currentRowConverter = fieldNamedPreparedStatement.getRowConverter();
if (!writeExtInfo) {
columnRowData.removeExtHeaderInfo();
}
} else {
String key =
getPstmtCacheKey(jdbcConf.getSchema(), jdbcConf.getTable(), RowKind.INSERT);
Expand Down Expand Up @@ -351,4 +356,8 @@ public void setClob(int fieldIndex, Reader reader) throws SQLException {
public void close() throws SQLException {
currentFieldNamedPstmt.close();
}

public void clearStatementCache() {
pstmtCache.invalidateAll();
}
}
70 changes: 62 additions & 8 deletions flinkx-core/src/main/java/com/dtstack/flinkx/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import com.dtstack.flinkx.cdc.CdcConf;
import com.dtstack.flinkx.cdc.RestorationFlatMap;
import com.dtstack.flinkx.cdc.ddl.DdlConvent;
import com.dtstack.flinkx.cdc.ddl.SendProcessHandler;
import com.dtstack.flinkx.cdc.monitor.fetch.FetcherBase;
import com.dtstack.flinkx.cdc.monitor.store.StoreBase;
import com.dtstack.flinkx.conf.OperatorConf;
Expand All @@ -30,7 +32,6 @@
import com.dtstack.flinkx.enums.EJobType;
import com.dtstack.flinkx.environment.EnvFactory;
import com.dtstack.flinkx.environment.MyLocalStreamEnvironment;
import com.dtstack.flinkx.mapping.NameMappingConf;
import com.dtstack.flinkx.mapping.NameMappingFlatMap;
import com.dtstack.flinkx.options.OptionParser;
import com.dtstack.flinkx.options.Options;
Expand All @@ -40,6 +41,7 @@
import com.dtstack.flinkx.throwable.FlinkxRuntimeException;
import com.dtstack.flinkx.throwable.JobConfigException;
import com.dtstack.flinkx.util.DataSyncFactoryUtil;
import com.dtstack.flinkx.util.DdlConventNameConvertUtil;
import com.dtstack.flinkx.util.ExecuteProcessHelper;
import com.dtstack.flinkx.util.FactoryHelper;
import com.dtstack.flinkx.util.JobUtil;
Expand Down Expand Up @@ -81,6 +83,11 @@
import java.util.Optional;
import java.util.Properties;

import static com.dtstack.flinkx.util.PluginUtil.READER_SUFFIX;
import static com.dtstack.flinkx.util.PluginUtil.SINK_SUFFIX;
import static com.dtstack.flinkx.util.PluginUtil.SOURCE_SUFFIX;
import static com.dtstack.flinkx.util.PluginUtil.WRITER_SUFFIX;

/**
* The main class entry
*
Expand Down Expand Up @@ -178,21 +185,17 @@ private static void exeSyncJob(
SourceFactory sourceFactory = DataSyncFactoryUtil.discoverSource(config, env);
DataStream<RowData> dataStreamSource = sourceFactory.createSource();

dataStreamSource = addNameMapping(config, dataStreamSource);
if (!config.getCdcConf().isSkipDDL()) {
CdcConf cdcConf = config.getCdcConf();
Pair<FetcherBase, StoreBase> monitorPair =
DataSyncFactoryUtil.discoverMonitor(cdcConf.getMonitor(), config);
DataSyncFactoryUtil.discoverFetchBase(cdcConf.getMonitor(), config);
dataStreamSource =
dataStreamSource.flatMap(
new RestorationFlatMap(
monitorPair.getLeft(), monitorPair.getRight(), cdcConf));
}

if (config.getNameMappingConf() != null) {
NameMappingConf mappingConf = config.getNameMappingConf();
dataStreamSource = dataStreamSource.flatMap(new NameMappingFlatMap(mappingConf));
}

SpeedConf speed = config.getSpeed();
if (speed.getReaderChannel() > 1) {
dataStreamSource =
Expand Down Expand Up @@ -220,7 +223,7 @@ private static void exeSyncJob(
if (speed.getWriterChannel() > 0) {
dataStreamSink.setParallelism(speed.getWriterChannel());
}

env.disableOperatorChaining();
JobExecutionResult result = env.execute(options.getJobName());
if (env instanceof MyLocalStreamEnvironment) {
PrintUtil.printResult(result.getAllAccumulatorResults());
Expand Down Expand Up @@ -351,4 +354,55 @@ private static void checkTableConf(OperatorConf operatorConf) {
throw new JobConfigException(operatorConf.getName(), "table.tableName", "is missing");
}
}

private static DataStream<RowData> addNameMapping(
SyncConf config, DataStream<RowData> dataStreamSource) {
boolean executeDdlAble =
(boolean) config.getWriter().getParameter().getOrDefault("executeDdlAble", false);

if (config.getNameMappingConf() != null || executeDdlAble) {
boolean needSqlConvent = executeDdlAble;

// 相同类型数据源且没有映射关系 此时不需要ddl转换
if (needSqlConvent
&& config.getWriter().getName().equals(config.getReader().getName())
&& config.getNameMappingConf() == null) {
needSqlConvent = false;
}

if (config.getNameMappingConf() != null
&& config.getNameMappingConf().getSqlConevnt() != null) {
needSqlConvent = config.getNameMappingConf().getSqlConevnt();
}

if (needSqlConvent) {
String readerName = config.getReader().getName();
String writerName = config.getWriter().getName();

readerName =
DdlConventNameConvertUtil.convertPackageName(
readerName.replace(READER_SUFFIX, "").replace(SOURCE_SUFFIX, ""));
writerName =
DdlConventNameConvertUtil.convertPackageName(
writerName.replace(WRITER_SUFFIX, "").replace(SINK_SUFFIX, ""));

DdlConvent sourceConvent = DataSyncFactoryUtil.discoverDdlConvent(readerName);
DdlConvent sinkConvent = DataSyncFactoryUtil.discoverDdlConvent(writerName);
dataStreamSource =
dataStreamSource.flatMap(
new NameMappingFlatMap(
config.getNameMappingConf(),
sourceConvent,
sinkConvent,
new SendProcessHandler()));
} else {
dataStreamSource =
dataStreamSource.flatMap(
new NameMappingFlatMap(
config.getNameMappingConf(), null, null, null));
}
}

return dataStreamSource;
}
}
20 changes: 20 additions & 0 deletions flinkx-core/src/main/java/com/dtstack/flinkx/cdc/DdlRowData.java
Original file line number Diff line number Diff line change
Expand Up @@ -181,4 +181,24 @@ public String getLsn() {
public String[] getHeaders() {
return headers;
}

public String[] getInfos() {
return ddlInfos;
}

public void replaceData(String original, String another) {
int index = getIndex(original);
if (index != -1) {
this.ddlInfos[index] = another;
}
}

public int getIndex(String header) {
for (int i = 0; i < headers.length; i++) {
if (headers[i].equals(header)) {
return i;
}
}
return -1;
}
}
Loading