diff --git a/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/sink/BaseHdfsOutputFormat.java b/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/sink/BaseHdfsOutputFormat.java index 562b7753ef..4a41aa5b81 100644 --- a/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/sink/BaseHdfsOutputFormat.java +++ b/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/sink/BaseHdfsOutputFormat.java @@ -38,7 +38,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; -import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -119,6 +118,12 @@ protected void checkOutputDir() { } } + /** 文件分隔符(File.separatorChar)在windows为\,而在linux中为/,在hadoop中路径需要固定为/, */ + protected char getHdfsPathChar() { + // hadoop 文件系统固定为/,避免路径不对,文件写入错误及移动失败 + return '/'; + } + @Override protected void deleteDataDir() { deleteDirectory(outputFilePath); @@ -160,7 +165,7 @@ public String getExtension() { @Override protected long getCurrentFileSize() { - String path = tmpPath + File.separatorChar + currentFileName; + String path = tmpPath + getHdfsPathChar() + currentFileName; try { if (hdfsConf.getMaxFileSize() > ConstantValue.STORE_SIZE_G) { return fs.getFileStatus(new Path(path)).getLen(); @@ -203,7 +208,7 @@ protected void deleteDataFiles(List preCommitFilePathList, String path) String currentFilePath = ""; try { for (String fileName : this.preCommitFilePathList) { - currentFilePath = path + File.separatorChar + fileName; + currentFilePath = path + getHdfsPathChar() + fileName; Path commitFilePath = new Path(currentFilePath); fs.delete(commitFilePath, true); LOG.info("delete file:{}", currentFilePath); diff --git a/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/sink/HdfsOrcOutputFormat.java b/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/sink/HdfsOrcOutputFormat.java index ebe5b21edf..1083a6a1fb 100644 --- a/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/sink/HdfsOrcOutputFormat.java +++ b/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/sink/HdfsOrcOutputFormat.java @@ -51,7 +51,6 @@ import org.apache.hadoop.mapred.RecordWriter; import org.apache.hadoop.mapred.Reporter; -import java.io.File; import java.io.IOException; import java.lang.reflect.Field; import java.util.ArrayList; @@ -163,7 +162,7 @@ protected void nextBlock() { } try { - String currentBlockTmpPath = tmpPath + File.separatorChar + currentFileName; + String currentBlockTmpPath = tmpPath + getHdfsPathChar() + currentFileName; recordWriter = outputFormat.getRecordWriter(null, jobConf, currentBlockTmpPath, Reporter.NULL); currentFileIndex++; diff --git a/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/sink/HdfsParquetOutputFormat.java b/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/sink/HdfsParquetOutputFormat.java index 728e58b078..4bcb19d35e 100644 --- a/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/sink/HdfsParquetOutputFormat.java +++ b/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/sink/HdfsParquetOutputFormat.java @@ -48,7 +48,6 @@ import org.apache.parquet.schema.PrimitiveType; import org.apache.parquet.schema.Types; -import java.io.File; import java.io.IOException; import java.security.PrivilegedAction; import java.util.HashMap; @@ -95,7 +94,7 @@ protected void nextBlock() { } try { - String currentBlockTmpPath = tmpPath + File.separatorChar + currentFileName; + String currentBlockTmpPath = tmpPath + getHdfsPathChar() + currentFileName; Path writePath = new Path(currentBlockTmpPath); // Compatible with old code diff --git a/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/sink/HdfsTextOutputFormat.java b/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/sink/HdfsTextOutputFormat.java index d418e18f7b..6c913bbe40 100644 --- a/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/sink/HdfsTextOutputFormat.java +++ b/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/sink/HdfsTextOutputFormat.java @@ -32,7 +32,6 @@ import org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream; import org.apache.hadoop.fs.Path; -import java.io.File; import java.io.IOException; import java.io.OutputStream; @@ -55,7 +54,7 @@ protected void nextBlock() { } try { - String currentBlockTmpPath = tmpPath + File.separatorChar + currentFileName; + String currentBlockTmpPath = tmpPath + getHdfsPathChar() + currentFileName; Path p = new Path(currentBlockTmpPath); if (CompressType.TEXT_NONE.equals(compressType)) {