From 3e066073a08a7e43b4b82245866fe4115297a72e Mon Sep 17 00:00:00 2001 From: conghe2402 Date: Thu, 17 Nov 2022 19:26:53 +0800 Subject: [PATCH] fix issue:hdfs inputformat split columns issues and deal with null value of column correctly --- .../hdfs/converter/HdfsTextColumnConverter.java | 17 +++++++++++++++++ .../hdfs/source/HdfsTextInputFormat.java | 3 ++- .../listener/LogMinerListener.java | 4 +++- 3 files changed, 22 insertions(+), 2 deletions(-) diff --git a/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/converter/HdfsTextColumnConverter.java b/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/converter/HdfsTextColumnConverter.java index fd016203ab..07717c71a4 100644 --- a/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/converter/HdfsTextColumnConverter.java +++ b/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/converter/HdfsTextColumnConverter.java @@ -107,6 +107,23 @@ public RowData toInternalLookup(RowData input) { throw new ChunJunRuntimeException("HDFS Connector doesn't support Lookup Table Function."); } + @Override + protected IDeserializationConverter wrapIntoNullableInternalConverter( + IDeserializationConverter IDeserializationConverter) { + return val -> { + if (val == null || "".equals(val)) { + return null; + } else { + try { + return IDeserializationConverter.deserialize(val); + } catch (Exception e) { + LOG.error("value [{}] convent failed ", val); + throw e; + } + } + }; + } + @Override @SuppressWarnings("unchecked") protected ISerializationConverter wrapIntoNullableExternalConverter( diff --git a/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/source/HdfsTextInputFormat.java b/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/source/HdfsTextInputFormat.java index 38b3989827..3e0b9b13c3 100644 --- a/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/source/HdfsTextInputFormat.java +++ b/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/source/HdfsTextInputFormat.java @@ -123,7 +123,8 @@ public RowData nextRecordInternal(RowData rowData) throws ReadRecordException { ((Text) value).getLength(), hdfsConf.getEncoding()); String[] fields = - StringUtils.splitPreserveAllTokens(line, hdfsConf.getFieldDelimiter()); + StringUtils.splitByWholeSeparatorPreserveAllTokens( + line, hdfsConf.getFieldDelimiter()); List fieldConfList = hdfsConf.getColumn(); GenericRowData genericRowData; diff --git a/chunjun-connectors/chunjun-connector-oraclelogminer/src/main/java/com/dtstack/chunjun/connector/oraclelogminer/listener/LogMinerListener.java b/chunjun-connectors/chunjun-connector-oraclelogminer/src/main/java/com/dtstack/chunjun/connector/oraclelogminer/listener/LogMinerListener.java index 0ce7479cb5..abb73278e2 100644 --- a/chunjun-connectors/chunjun-connector-oraclelogminer/src/main/java/com/dtstack/chunjun/connector/oraclelogminer/listener/LogMinerListener.java +++ b/chunjun-connectors/chunjun-connector-oraclelogminer/src/main/java/com/dtstack/chunjun/connector/oraclelogminer/listener/LogMinerListener.java @@ -524,7 +524,9 @@ private BigInteger getLockTableScn(Connection conn, String tbnWithSchema) { } /* generate create table ddl */ - initialTableStruct(conn); + if (logMinerConf.isInitialTableStructure()) { + initialTableStruct(conn); + } /* release lock */ stmt.execute(SqlUtil.releaseTableLock());