diff --git a/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/converter/HdfsTextColumnConverter.java b/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/converter/HdfsTextColumnConverter.java index fd016203ab..07717c71a4 100644 --- a/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/converter/HdfsTextColumnConverter.java +++ b/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/converter/HdfsTextColumnConverter.java @@ -107,6 +107,23 @@ public RowData toInternalLookup(RowData input) { throw new ChunJunRuntimeException("HDFS Connector doesn't support Lookup Table Function."); } + @Override + protected IDeserializationConverter wrapIntoNullableInternalConverter( + IDeserializationConverter IDeserializationConverter) { + return val -> { + if (val == null || "".equals(val)) { + return null; + } else { + try { + return IDeserializationConverter.deserialize(val); + } catch (Exception e) { + LOG.error("value [{}] convent failed ", val); + throw e; + } + } + }; + } + @Override @SuppressWarnings("unchecked") protected ISerializationConverter wrapIntoNullableExternalConverter( diff --git a/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/source/HdfsTextInputFormat.java b/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/source/HdfsTextInputFormat.java index 38b3989827..3e0b9b13c3 100644 --- a/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/source/HdfsTextInputFormat.java +++ b/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/source/HdfsTextInputFormat.java @@ -123,7 +123,8 @@ public RowData nextRecordInternal(RowData rowData) throws ReadRecordException { ((Text) value).getLength(), hdfsConf.getEncoding()); String[] fields = - StringUtils.splitPreserveAllTokens(line, hdfsConf.getFieldDelimiter()); + StringUtils.splitByWholeSeparatorPreserveAllTokens( + line, hdfsConf.getFieldDelimiter()); List fieldConfList = hdfsConf.getColumn(); GenericRowData genericRowData; diff --git a/chunjun-connectors/chunjun-connector-oraclelogminer/src/main/java/com/dtstack/chunjun/connector/oraclelogminer/listener/LogMinerListener.java b/chunjun-connectors/chunjun-connector-oraclelogminer/src/main/java/com/dtstack/chunjun/connector/oraclelogminer/listener/LogMinerListener.java index 0ce7479cb5..abb73278e2 100644 --- a/chunjun-connectors/chunjun-connector-oraclelogminer/src/main/java/com/dtstack/chunjun/connector/oraclelogminer/listener/LogMinerListener.java +++ b/chunjun-connectors/chunjun-connector-oraclelogminer/src/main/java/com/dtstack/chunjun/connector/oraclelogminer/listener/LogMinerListener.java @@ -524,7 +524,9 @@ private BigInteger getLockTableScn(Connection conn, String tbnWithSchema) { } /* generate create table ddl */ - initialTableStruct(conn); + if (logMinerConf.isInitialTableStructure()) { + initialTableStruct(conn); + } /* release lock */ stmt.execute(SqlUtil.releaseTableLock());