From bf00af56c818fe9e3b9b894af7ec46a177926bd3 Mon Sep 17 00:00:00 2001 From: zhao_wei_nan <326747337@qq.com> Date: Thu, 16 Jun 2022 13:11:20 +0800 Subject: [PATCH 1/2] [hotfix-#948][jdbc] OracleSink writes to multi-table Column mapping error, causing write failure --- .../jdbc/sink/DynamicPreparedStmt.java | 38 ++++--------------- 1 file changed, 8 insertions(+), 30 deletions(-) diff --git a/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/sink/DynamicPreparedStmt.java b/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/sink/DynamicPreparedStmt.java index fc1532df50..5e88a1ecd9 100644 --- a/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/sink/DynamicPreparedStmt.java +++ b/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/sink/DynamicPreparedStmt.java @@ -84,14 +84,7 @@ public static DynamicPreparedStmt buildStmt( dynamicPreparedStmt.getColumnMeta(schemaName, tableName, connection); dynamicPreparedStmt.buildRowConvert(); - String sql = - dynamicPreparedStmt.prepareTemplates( - rowKind, - schemaName, - tableName, - jdbcConf.getUniqueKey().toArray(new String[0]), - jdbcConf.getMode(), - jdbcConf.isAllReplace()); + String sql = dynamicPreparedStmt.prepareTemplates(rowKind, schemaName, tableName, jdbcConf); String[] fieldNames = new String[dynamicPreparedStmt.columnNameList.size()]; dynamicPreparedStmt.columnNameList.toArray(fieldNames); dynamicPreparedStmt.fieldNamedPreparedStatement = @@ -120,14 +113,7 @@ public static DynamicPreparedStmt buildStmt( dynamicPreparedStmt.columnNameList.add(fieldConf.getName()); dynamicPreparedStmt.columnTypeList.add(fieldConf.getType()); } - String sql = - dynamicPreparedStmt.prepareTemplates( - rowKind, - schemaName, - tableName, - jdbcConf.getUniqueKey().toArray(new String[0]), - jdbcConf.getMode(), - jdbcConf.isAllReplace()); + String sql = dynamicPreparedStmt.prepareTemplates(rowKind, schemaName, tableName, jdbcConf); dynamicPreparedStmt.fieldNamedPreparedStatement = FieldNamedPreparedStatementImpl.prepareStatement(connection, sql, fieldNames); return dynamicPreparedStmt; @@ -151,19 +137,12 @@ public static DynamicPreparedStmt buildStmt( } protected String prepareTemplates( - RowKind rowKind, - String schemaName, - String tableName, - String[] uniqueKeys, - String mode, - boolean allReplace) { + RowKind rowKind, String schemaName, String tableName, JdbcConf jdbcConf) { String singleSql = null; switch (rowKind) { case INSERT: case UPDATE_AFTER: - singleSql = - this.getInsertStatementWithWriteMode( - mode, schemaName, tableName, uniqueKeys, allReplace); + singleSql = this.getInsertStatementWithWriteMode(jdbcConf, schemaName, tableName); break; case DELETE: case UPDATE_BEFORE: @@ -180,12 +159,9 @@ protected String prepareTemplates( } protected String getInsertStatementWithWriteMode( - String mode, - String schemaName, - String tableName, - String[] uniqueKeys, - boolean allReplace) { + JdbcConf jdbcConf, String schemaName, String tableName) { String singleSql; + String mode = jdbcConf.getMode(); if (EWriteMode.INSERT.name().equalsIgnoreCase(mode)) { singleSql = jdbcDialect.getInsertIntoStatement( @@ -197,6 +173,8 @@ protected String getInsertStatementWithWriteMode( schemaName, tableName, columnNameList.toArray(new String[0])) .get(); } else if (EWriteMode.UPDATE.name().equalsIgnoreCase(mode)) { + String[] uniqueKeys = jdbcConf.getUniqueKey().toArray(new String[0]); + boolean allReplace = jdbcConf.isAllReplace(); singleSql = jdbcDialect .getUpsertStatement( From 8e7054548d78c34d3cd217dd2ddf14a5053956dc Mon Sep 17 00:00:00 2001 From: zhao_wei_nan <326747337@qq.com> Date: Thu, 16 Jun 2022 23:27:19 +0800 Subject: [PATCH 2/2] [hotfix-822][stream] Add Stream Connector, print rowData RowKind in first Column. --- .../connector/jdbc/sink/JdbcOutputFormat.java | 3 +- .../jdbc/sink/PreparedStmtProxy.java | 7 +- jobs/test.json | 72 +++++++++++++++++ jobs/test1.json | 79 +++++++++++++++++++ jobs/test3.json | 79 +++++++++++++++++++ 5 files changed, 238 insertions(+), 2 deletions(-) create mode 100644 jobs/test.json create mode 100644 jobs/test1.json create mode 100644 jobs/test3.json diff --git a/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/sink/JdbcOutputFormat.java b/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/sink/JdbcOutputFormat.java index 7d4b44a73a..60ffff26d7 100644 --- a/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/sink/JdbcOutputFormat.java +++ b/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/sink/JdbcOutputFormat.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package com.dtstack.chunjun.connector.jdbc.sink; import com.dtstack.chunjun.cdc.DdlRowData; @@ -116,7 +117,7 @@ public void buildStmtProxy() throws SQLException { String tableInfo = jdbcConf.getTable(); if ("*".equalsIgnoreCase(tableInfo)) { - stmtProxy = new PreparedStmtProxy(dbConn, jdbcDialect, false); + stmtProxy = new PreparedStmtProxy(dbConn, jdbcDialect, false, jdbcConf); } else { FieldNamedPreparedStatement fieldNamedPreparedStatement = FieldNamedPreparedStatement.prepareStatement( diff --git a/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/sink/PreparedStmtProxy.java b/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/sink/PreparedStmtProxy.java index 8a4a6952dd..8fb058ce58 100644 --- a/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/sink/PreparedStmtProxy.java +++ b/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/sink/PreparedStmtProxy.java @@ -90,10 +90,15 @@ public class PreparedStmtProxy implements FieldNamedPreparedStatement { /** 是否将框架额外添加的扩展信息写入到数据库,默认不写入* */ protected boolean writeExtInfo; - public PreparedStmtProxy(Connection connection, JdbcDialect jdbcDialect, boolean writeExtInfo) { + public PreparedStmtProxy( + Connection connection, + JdbcDialect jdbcDialect, + boolean writeExtInfo, + JdbcConf jdbcConf) { this.connection = connection; this.jdbcDialect = jdbcDialect; this.writeExtInfo = writeExtInfo; + this.jdbcConf = jdbcConf; initCache(true); } diff --git a/jobs/test.json b/jobs/test.json new file mode 100644 index 0000000000..a1470447f5 --- /dev/null +++ b/jobs/test.json @@ -0,0 +1,72 @@ +{ + "job": { + "content": [ + { + "nameMapping": { + "schemaMappings": { + "LOGMINER_TEST": "LOGMINER_TEST" + }, + "tableMappings": { + "ZJH_LG_FROM2": "luming_adb" + } + }, + "reader": { + "parameter": { + "jdbcUrl": "jdbc:oracle:thin:@localhost:1521:helowin", + "username": "logminer_test", + "password": "123456", + "supportAutoAddLog": false, + "table": [ + "LOGMINER_TEST.ZJH_LG_FROM2" + ], + "cat": "INSERT,UPDATE,DELETE", + "split": true, + "readPosition": "CURRENT", + "queryTimeout": 3000 + }, + "name": "oraclelogminerreader" + }, + "writer": { + "parameter": { + "column": [ + { + "name": "id", + "type": "int" + }, + { + "name": "name", + "type": "string" + } + ], + "print": true + }, + "table": { + "tableName": "sinkTable" + }, + "name": "streamwriter" + } + } + ], + "setting": { + "speed": { + "bytes": 0, + "channel": 1 + }, + "errorLimit": { + "record": 1 + }, + "restore": { + "maxRowNumForCheckpoint": 0, + "isRestore": false, + "restoreColumnName": "", + "restoreColumnIndex": 0 + }, + "log": { + "isLogger": true, + "level": "debug", + "path": "", + "pattern": "" + } + } + } +} diff --git a/jobs/test1.json b/jobs/test1.json new file mode 100644 index 0000000000..afe3077739 --- /dev/null +++ b/jobs/test1.json @@ -0,0 +1,79 @@ +{ + "job": { + "content": [ + { + "nameMapping": { + "schemaMappings": { + "LOGMINER_TEST": "LOGMINER_TEST" + }, + "tableMappings": { + "ZJH_LG_FROM2": "ZJH_LG_TO2" + }, + "fieldMappings": { + "LOGMINER_TEST": { + "ZJH_LG_FROM2": { + "ID": "id", + "NAME": "name" + } + } + } + }, + "reader": { + "parameter": { + "jdbcUrl": "jdbc:oracle:thin:@localhost:1521:helowin", + "username": "logminer_test", + "password": "123456", + "supportAutoAddLog": false, + "table": [ + "LOGMINER_TEST.ZJH_LG_FROM2" + ], + "cat": "INSERT,UPDATE,DELETE", + "readPosition": "CURRENT", + "queryTimeout": 3000 + }, + "name": "oraclelogminerreader" + }, + "writer": { + "parameter": { + "writeMode": "insert", + "updateKey": [], + "allReplace": true, + "username": "logminer_test", + "password": "123456", + "connection": [ + { + "schema": "LOGMINER_TEST", + "jdbcUrl": "jdbc:oracle:thin:@localhost:1521:helowin", + "table": [ + "*" + ] + } + ] + }, + "name": "oraclewriter" + } + } + ], + "setting": { + "speed": { + "bytes": 0, + "channel": 1 + }, + "errorLimit": { + "record": 1 + }, + "restore": { + "maxRowNumForCheckpoint": 0, + "isRestore": false, + "restoreColumnName": "", + "restoreColumnIndex": 0 + }, + "log": { + "isLogger": true, + "level": "debug", + "path": "", + "pattern": "" + } + } + } +} diff --git a/jobs/test3.json b/jobs/test3.json new file mode 100644 index 0000000000..74d6208771 --- /dev/null +++ b/jobs/test3.json @@ -0,0 +1,79 @@ +{ + "job": { + "content": [ + { + "nameMapping": { + "schemaMappings": { + "TIEZHU": "public" + }, + "tableMappings": { + "TIEZHU": { + "TEST_TWO": "test_two", + "two": "table2" + } + }, + "fieldMappings": { + "TIEZHU": { + "TEST_TWO": { + "ID": "id", + "_DATE": "_date_", + "_TIMESTAMP": "_timestamp_", + "_STR": "_str_" + }, + "two": { + "id": "id", + "name": "username" + } + } + } + }, + "reader": { + "parameter": { + "schema": "TIEZHU", + "password": "abc123", + "split": true, + "cat": "insert,update,delete", + "jdbcUrl": "jdbc:oracle:thin:@172.16.100.243:1521:orcl", + "readPosition": "current", + "pavingData": false, + "table": [ + "TIEZHU.TEST_TWO" + ], + "username": "tiezhu" + }, + "name": "oraclelogminerreader", + "type": 2 + }, + "writer": { + "parameter": { + "password": "abc123", + "connection": [ + { + "jdbcUrl": "jdbc:postgresql://172.16.101.246:5432/tiezhu", + "table": [ + "*" + ] + } + ], + "writeMode": "APPEND", + "username": "admin" + }, + "name": "postgresqlwriter" + } + } + ], + "setting": { + "restore": { + "isRestore": true, + "isStream": true + }, + "errorLimit": {}, + "speed": { + "readerChannel": 1, + "writerChannel": 1, + "bytes": -1048576, + "channel": 1 + } + } + } +}