From c9e95ffefdcdf4c6df0db48c5cd2997848999f74 Mon Sep 17 00:00:00 2001 From: dujie Date: Fri, 16 Sep 2022 14:59:25 +0800 Subject: [PATCH] [hotfix]oracle type modify on sql & adjust to get unique key method --- .../connector/jdbc/sink/JdbcOutputFormat.java | 2 +- .../chunjun/connector/jdbc/util/JdbcUtil.java | 17 +++++++++++++++++ .../oracle/converter/OracleRowConverter.java | 12 ++++++++---- .../converter/OracleRawTypeConverter.java | 5 +++-- 4 files changed, 29 insertions(+), 7 deletions(-) diff --git a/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/sink/JdbcOutputFormat.java b/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/sink/JdbcOutputFormat.java index 883e3a911b..84e30e1e4a 100644 --- a/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/sink/JdbcOutputFormat.java +++ b/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/sink/JdbcOutputFormat.java @@ -98,7 +98,7 @@ protected void openInternal(int taskNumber, int numTasks) { List updateKey = jdbcConf.getUniqueKey(); if (CollectionUtils.isEmpty(updateKey)) { List tableIndex = - JdbcUtil.getTableIndex( + JdbcUtil.getTableUniqueIndex( jdbcConf.getSchema(), jdbcConf.getTable(), dbConn); jdbcConf.setUniqueKey(tableIndex); LOG.info("updateKey = {}", JsonUtil.toJson(tableIndex)); diff --git a/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/util/JdbcUtil.java b/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/util/JdbcUtil.java index c7999f953b..40792f649c 100644 --- a/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/util/JdbcUtil.java +++ b/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/util/JdbcUtil.java @@ -32,6 +32,7 @@ import org.apache.flink.table.types.logical.LogicalType; +import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang.StringUtils; import org.apache.commons.lang3.tuple.Pair; import org.slf4j.Logger; @@ -177,6 +178,22 @@ public static List getTableIndex(String schema, String tableName, Connec return indexList; } + public static List getTableUniqueIndex( + String schema, String tableName, Connection dbConn) throws SQLException { + List tablePrimaryKey = getTablePrimaryKey(schema, tableName, dbConn); + if (CollectionUtils.isNotEmpty(tablePrimaryKey)) { + return tablePrimaryKey; + } + + ResultSet rs = dbConn.getMetaData().getIndexInfo(null, schema, tableName, true, false); + List indexList = new LinkedList<>(); + while (rs.next()) { + String index = rs.getString(9); + if (StringUtils.isNotBlank(index)) indexList.add(index); + } + return indexList; + } + /** * get primarykey * diff --git a/chunjun-connectors/chunjun-connector-oracle/src/main/java/com/dtstack/chunjun/connector/oracle/converter/OracleRowConverter.java b/chunjun-connectors/chunjun-connector-oracle/src/main/java/com/dtstack/chunjun/connector/oracle/converter/OracleRowConverter.java index 1730210969..e643c61f7e 100644 --- a/chunjun-connectors/chunjun-connector-oracle/src/main/java/com/dtstack/chunjun/connector/oracle/converter/OracleRowConverter.java +++ b/chunjun-connectors/chunjun-connector-oracle/src/main/java/com/dtstack/chunjun/connector/oracle/converter/OracleRowConverter.java @@ -152,7 +152,8 @@ protected IDeserializationConverter createInternalConverter(LogicalType type) { } }; case CHAR: - if (((CharType) type).getLength() > CLOB_LENGTH) { + if (((CharType) type).getLength() > CLOB_LENGTH + && ((VarCharType) type).getLength() != Integer.MAX_VALUE) { return val -> { oracle.sql.CLOB clob = (oracle.sql.CLOB) val; return StringData.fromString(ConvertUtil.convertClob(clob)); @@ -160,7 +161,8 @@ protected IDeserializationConverter createInternalConverter(LogicalType type) { } return val -> StringData.fromString(val.toString()); case VARCHAR: - if (((VarCharType) type).getLength() > CLOB_LENGTH) { + if (((VarCharType) type).getLength() > CLOB_LENGTH + && ((VarCharType) type).getLength() != Integer.MAX_VALUE) { return val -> { oracle.sql.CLOB clob = (oracle.sql.CLOB) val; return StringData.fromString(ConvertUtil.convertClob(clob)); @@ -200,7 +202,8 @@ protected ISerializationConverter createExternalCon case DOUBLE: return (val, index, statement) -> statement.setDouble(index, val.getDouble(index)); case CHAR: - if (((CharType) type).getLength() > CLOB_LENGTH) { + if (((CharType) type).getLength() > CLOB_LENGTH + && ((VarCharType) type).getLength() != Integer.MAX_VALUE) { return (val, index, statement) -> { try (StringReader reader = new StringReader(val.getString(index).toString())) { @@ -213,7 +216,8 @@ protected ISerializationConverter createExternalCon }; case VARCHAR: // value is BinaryString - if (((VarCharType) type).getLength() > CLOB_LENGTH) { + if (((VarCharType) type).getLength() > CLOB_LENGTH + && ((VarCharType) type).getLength() != Integer.MAX_VALUE) { return (val, index, statement) -> { try (StringReader reader = new StringReader(val.getString(index).toString())) { diff --git a/chunjun-connectors/chunjun-connector-oraclelogminer/src/main/java/com/dtstack/chunjun/connector/oraclelogminer/converter/OracleRawTypeConverter.java b/chunjun-connectors/chunjun-connector-oraclelogminer/src/main/java/com/dtstack/chunjun/connector/oraclelogminer/converter/OracleRawTypeConverter.java index 89ff1fcddc..5b0e412e42 100644 --- a/chunjun-connectors/chunjun-connector-oraclelogminer/src/main/java/com/dtstack/chunjun/connector/oraclelogminer/converter/OracleRawTypeConverter.java +++ b/chunjun-connectors/chunjun-connector-oraclelogminer/src/main/java/com/dtstack/chunjun/connector/oraclelogminer/converter/OracleRawTypeConverter.java @@ -52,14 +52,15 @@ public static DataType apply(String type) { case "NCHAR": case "NVARCHAR2": case "LONG": - case "RAW": - case "LONG RAW": case "BLOB": case "CLOB": case "NCLOB": case "INTERVAL YEAR": case "INTERVAL DAY": return DataTypes.STRING(); + case "RAW": + case "LONG RAW": + return DataTypes.BYTES(); case "INT": case "INTEGER": return DataTypes.INT();