diff --git a/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/client/Data.java b/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/client/Data.java index 2b79fca4b5..facdd300a2 100644 --- a/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/client/Data.java +++ b/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/client/Data.java @@ -18,22 +18,33 @@ package com.dtstack.chunjun.connector.ftp.client; +import com.dtstack.chunjun.connector.ftp.extend.ftp.FtpParseException; import com.dtstack.chunjun.connector.ftp.handler.Position; -/** return from ftpSeqBufferedReader contains line and position */ public class Data { private String[] data; private Position position; + private FtpParseException exception; public Data(String[] data, Position position) { this.data = data; this.position = position; } + public Data(String[] data, Position position, FtpParseException exception) { + this.data = data; + this.position = position; + this.exception = exception; + } + public String[] getData() { return data; } + public FtpParseException getException() { + return exception; + } + public Position getPosition() { return position; } diff --git a/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/client/FileUtil.java b/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/client/FileUtil.java index 194f92b43c..e3fdfb3b17 100644 --- a/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/client/FileUtil.java +++ b/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/client/FileUtil.java @@ -18,146 +18,14 @@ package com.dtstack.chunjun.connector.ftp.client; -import com.dtstack.chunjun.connector.ftp.conf.FtpConfig; -import com.dtstack.chunjun.connector.ftp.handler.FtpHandler; -import com.dtstack.chunjun.connector.ftp.handler.IFtpHandler; -import com.dtstack.chunjun.connector.ftp.source.FtpFileSplit; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.io.InputStream; -import java.nio.charset.Charset; -import java.util.List; -import java.util.Locale; -import java.util.zip.ZipEntry; -import java.util.zip.ZipInputStream; - public class FileUtil { private static final Logger LOG = LoggerFactory.getLogger(FileUtil.class); - public static void addCompressFile( - IFtpHandler ftpHandler, - String filePath, - FtpConfig ftpConfig, - List fileList) - throws IOException { - if ("ZIP".equals(ftpConfig.getCompressType().toUpperCase(Locale.ENGLISH))) { - try (ZipInputStream zipInputStream = - new ZipInputStream( - ftpHandler.getInputStream(filePath), - Charset.forName(ftpConfig.encoding))) { - ZipEntry zipEntry; - while ((zipEntry = zipInputStream.getNextEntry()) != null) { - fileList.add( - new FtpFileSplit( - 0, - zipEntry.getSize(), - filePath, - zipEntry.getName(), - ftpConfig.getCompressType())); - } - closeWithFtpHandler(ftpHandler, LOG); - } - } else { - throw new RuntimeException("not support compressType " + ftpConfig.getCompressType()); - } - } - - public static void closeWithFtpHandler(IFtpHandler ftpHandler, Logger log) { - if (ftpHandler instanceof FtpHandler) { - try { - ((FtpHandler) ftpHandler).getFtpClient().completePendingCommand(); - } catch (Exception e) { - log.warn("FTPClient completePendingCommand has error ->", e); - try { - ftpHandler.logoutFtpServer(); - } catch (Exception exception) { - log.warn("FTPClient logout has error ->", exception); - } - } - } - } - public static String getFilename(String filepath) { String[] paths = filepath.split("/"); return paths[paths.length - 1]; } - - /** analyse file */ - public static void addFile( - IFtpHandler ftpHandler, - String filePath, - FtpConfig ftpConfig, - List fileList) - throws Exception { - long maxFetchSize = ftpConfig.getMaxFetchSize(); - - // fetchSize should bigger than 1M - maxFetchSize = Math.max(maxFetchSize, 1024 * 1024); - - long currentFileSize = ftpHandler.getFileSize(filePath); - int parallelism = ftpConfig.getParallelism(); - - String filename = getFilename(filePath); - - // do not split excel - if (ftpConfig.getFileType() == null - || ftpConfig.getFileType().equals("excel") - || ftpConfig.getFileType().equals("custom")) { - FtpFileSplit ftpFileSplit = new FtpFileSplit(0, currentFileSize, filePath, filename); - fileList.add(ftpFileSplit); - return; - } - - // split file - if (maxFetchSize < currentFileSize) { - int perSplit = Math.min((int) currentFileSize / parallelism, Integer.MAX_VALUE); - long startPosition = 0; - long endPosition = startPosition + perSplit; - - while (endPosition <= currentFileSize) { - if (endPosition == currentFileSize) { - FtpFileSplit ftpFileSplit = - new FtpFileSplit(startPosition, endPosition, filePath, filename); - fileList.add(ftpFileSplit); - break; - } - - InputStream input = ftpHandler.getInputStreamByPosition(filePath, endPosition); - char c = ' '; - - while (c != '\n') { - c = (char) input.read(); - endPosition += 1; - } - FtpFileSplit ftpFileSplit = - new FtpFileSplit(startPosition, endPosition, filePath, filename); - fileList.add(ftpFileSplit); - - LOG.info( - String.format( - "build file split, filename: %s, startPosition: %d, endPosition: %d", - filePath, startPosition, endPosition)); - - startPosition = endPosition; - endPosition = startPosition + perSplit; - } - - if (startPosition != currentFileSize) { - FtpFileSplit ftpFileSplit = - new FtpFileSplit(startPosition, currentFileSize, filePath, filename); - fileList.add(ftpFileSplit); - - LOG.info( - String.format( - "build file split, filename: %s, startPosition: %d, endPosition: %d", - filePath, startPosition, currentFileSize)); - } - } else { - FtpFileSplit ftpFileSplit = new FtpFileSplit(0, currentFileSize, filePath, filename); - fileList.add(ftpFileSplit); - } - } } diff --git a/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/client/ZipInputStream.java b/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/client/ZipInputStream.java index 44e6469598..940b84086b 100644 --- a/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/client/ZipInputStream.java +++ b/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/client/ZipInputStream.java @@ -29,13 +29,12 @@ import java.util.List; import java.util.zip.ZipEntry; -/** zip文件流 如果fileNameList不为空 只会读取fileNameList里的文件* */ public class ZipInputStream extends InputStream { private static final Logger LOG = LoggerFactory.getLogger(ZipInputStream.class); - private java.util.zip.ZipInputStream zipInputStream; + private final java.util.zip.ZipInputStream zipInputStream; + private final List fileNameList; private ZipEntry currentZipEntry; - private List fileNameList; public ZipInputStream(InputStream in) { this.zipInputStream = new java.util.zip.ZipInputStream(in); diff --git a/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/client/excel/ExcelReaderExecutor.java b/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/client/excel/ExcelReaderExecutor.java index f985cd3dc4..eb019fde3f 100644 --- a/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/client/excel/ExcelReaderExecutor.java +++ b/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/client/excel/ExcelReaderExecutor.java @@ -23,7 +23,7 @@ public class ExcelReaderExecutor implements Runnable { private final ExcelReader reader; - private final ExcelSubExceptionCarrier ec; + private ExcelSubExceptionCarrier ec; public ExcelReaderExecutor(ExcelReader reader, ExcelSubExceptionCarrier ec) { this.reader = reader; diff --git a/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/client/excel/ExcelSubExceptionCarrier.java b/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/client/excel/ExcelSubExceptionCarrier.java index b7098af6a7..6e6c4fe48a 100644 --- a/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/client/excel/ExcelSubExceptionCarrier.java +++ b/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/client/excel/ExcelSubExceptionCarrier.java @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, diff --git a/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/conf/FtpConfig.java b/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/conf/FtpConfig.java index 1cd3a01f28..da856160d4 100644 --- a/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/conf/FtpConfig.java +++ b/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/conf/FtpConfig.java @@ -19,6 +19,7 @@ package com.dtstack.chunjun.connector.ftp.conf; import com.dtstack.chunjun.conf.BaseFileConf; +import com.dtstack.chunjun.connector.ftp.enums.FileType; import com.dtstack.chunjun.constants.ConstantValue; import java.util.Map; @@ -47,6 +48,9 @@ public class FtpConfig extends BaseFileConf { private String ftpFileName; + /** 批量写入数据太大,会导致ftp协议缓冲区报错, 批量写入默认值设置小点 */ + private long nextCheckRows = 100; + public String encoding = "UTF-8"; /** 空值替换 */ @@ -58,12 +62,34 @@ public class FtpConfig extends BaseFileConf { /** User defined format class name */ private String customFormatClassName; + /** User defined split class name */ + private String customConcurrentFileSplitClassName; + + /* 行分隔符 */ + private String columnDelimiter = "\n"; + /** Get the specified fileReadClient according to the filetype * */ - public String fileType; + public String fileType = FileType.TXT.name(); /** 压缩格式 * */ public String compressType; + public String getColumnDelimiter() { + return columnDelimiter; + } + + public void setColumnDelimiter(String columnDelimiter) { + this.columnDelimiter = columnDelimiter; + } + + public String getCustomConcurrentFileSplitClassName() { + return customConcurrentFileSplitClassName; + } + + public void setCustomConcurrentFileSplitClassName(String customConcurrentFileSplitClassName) { + this.customConcurrentFileSplitClassName = customConcurrentFileSplitClassName; + } + public String getCustomFormatClassName() { return customFormatClassName; } @@ -238,6 +264,16 @@ public void setMaxFetchSize(long fetchSize) { this.maxFetchSize = fetchSize; } + @Override + public long getNextCheckRows() { + return nextCheckRows; + } + + @Override + public void setNextCheckRows(long nextCheckRows) { + this.nextCheckRows = nextCheckRows; + } + public long getMaxFetchSize() { return this.maxFetchSize; } @@ -259,10 +295,16 @@ public String toString() { .add("listHiddenFiles=" + listHiddenFiles) .add("maxFetchSize=" + maxFetchSize) .add("ftpFileName='" + ftpFileName + "'") + .add("nextCheckRows=" + nextCheckRows) .add("encoding='" + encoding + "'") .add("nullIsReplacedWithValue=" + nullIsReplacedWithValue) .add("fileConfig=" + fileConfig) .add("customFormatClassName='" + customFormatClassName + "'") + .add( + "customConcurrentFileSplitClassName='" + + customConcurrentFileSplitClassName + + "'") + .add("columnDelimiter='" + columnDelimiter + "'") .add("fileType='" + fileType + "'") .add("compressType='" + compressType + "'") .toString(); diff --git a/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/converter/FtpColumnConverter.java b/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/converter/FtpColumnConverter.java index ed1af80781..0ec710caa3 100644 --- a/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/converter/FtpColumnConverter.java +++ b/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/converter/FtpColumnConverter.java @@ -18,7 +18,6 @@ package com.dtstack.chunjun.connector.ftp.converter; -import com.dtstack.chunjun.conf.FieldConf; import com.dtstack.chunjun.connector.ftp.conf.FtpConfig; import com.dtstack.chunjun.converter.AbstractRowConverter; import com.dtstack.chunjun.converter.IDeserializationConverter; @@ -47,26 +46,21 @@ import java.util.ArrayList; import java.util.List; -/** - * @program chunjun - * @author: xiuzhu - * @create: 2021/06/19 - */ -public class FtpColumnConverter extends AbstractRowConverter { +public class FtpColumnConverter + extends AbstractRowConverter { private final FtpConfig ftpConfig; public FtpColumnConverter(RowType rowType, FtpConfig ftpConfig) { - super(rowType); + super(rowType, ftpConfig); this.ftpConfig = ftpConfig; for (int i = 0; i < rowType.getFieldCount(); i++) { - FieldConf fieldConf = ftpConfig.getColumn().get(i); toInternalConverters.add( wrapIntoNullableInternalConverter( createInternalConverter(rowType.getTypeAt(i)))); toExternalConverters.add( wrapIntoNullableExternalConverter( - createExternalConverter(fieldConf), fieldConf)); + createExternalConverter(rowType.getTypeAt(i)), rowType.getTypeAt(i))); } } @@ -96,7 +90,7 @@ public String toExternal(RowData rowData, String output) throws Exception { StringBuilder sb = new StringBuilder(128); List columnData = new ArrayList<>(ftpConfig.getColumn().size()); - for (int index = 0; index < rowData.getArity(); index++) { + for (int index = 0; index < toExternalConverters.size(); index++) { toExternalConverters.get(index).serialize(rowData, index, columnData); if (index != 0) { sb.append(ftpConfig.getFieldDelimiter()); @@ -109,7 +103,7 @@ public String toExternal(RowData rowData, String output) throws Exception { @Override @SuppressWarnings("unchecked") protected ISerializationConverter> wrapIntoNullableExternalConverter( - ISerializationConverter serializationConverter, FieldConf fieldConf) { + ISerializationConverter serializationConverter, LogicalType logicalType) { return (rowData, index, list) -> { if (rowData == null || rowData.isNullAt(index)) { list.add(index, null); @@ -119,6 +113,7 @@ protected ISerializationConverter> wrapIntoNullableExternalConverte }; } + @Override protected IDeserializationConverter createInternalConverter(LogicalType type) { switch (type.getTypeRoot()) { case BOOLEAN: @@ -159,8 +154,27 @@ protected IDeserializationConverter createInternalConverter(LogicalType type) { } @Override - protected ISerializationConverter> createExternalConverter(FieldConf fieldConf) { - return (rowData, index, list) -> - list.add(index, ((ColumnRowData) rowData).getField(index).asString()); + protected ISerializationConverter> createExternalConverter( + LogicalType logicalType) { + switch (logicalType.getTypeRoot()) { + case DATE: + return (rowData, index, list) -> { + if (rowData instanceof ColumnRowData) { + list.add( + index, + ((ColumnRowData) rowData).getField(index).asSqlDate().toString()); + } else { + list.add(index, ((GenericRowData) rowData).getField(index).toString()); + } + }; + default: + return (rowData, index, list) -> { + if (rowData instanceof ColumnRowData) { + list.add(index, ((ColumnRowData) rowData).getField(index).asString()); + } else { + list.add(index, ((GenericRowData) rowData).getField(index).toString()); + } + }; + } } } diff --git a/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/converter/FtpRawTypeConverter.java b/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/converter/FtpRawTypeConverter.java index 595379ed5f..cac99580fc 100644 --- a/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/converter/FtpRawTypeConverter.java +++ b/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/converter/FtpRawTypeConverter.java @@ -27,11 +27,6 @@ import java.util.Locale; -/** - * @program chunjun - * @author: xiuzhu - * @create: 2021/06/28 - */ public class FtpRawTypeConverter { public static DataType apply(String type) throws UnsupportedTypeException { diff --git a/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/converter/FtpRowConverter.java b/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/converter/FtpRowConverter.java index 1a2292e46c..8dc8d61e21 100644 --- a/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/converter/FtpRowConverter.java +++ b/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/converter/FtpRowConverter.java @@ -25,11 +25,6 @@ import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.logical.LogicalType; -/** - * @program chunjun - * @author: xiuzhu - * @create: 2021/06/19 - */ public class FtpRowConverter extends AbstractRowConverter { private DeserializationSchema valueDeserialization; diff --git a/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/enums/FileType.java b/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/enums/FileType.java index 93786fb8b4..ca0e1698a9 100644 --- a/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/enums/FileType.java +++ b/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/enums/FileType.java @@ -27,8 +27,7 @@ public enum FileType { CSV, EXCEL, /** For user-defined file types, the IFileReadFormat interface needs to be implemented.* */ - CUSTOM, - ; + CUSTOM; public static FileType getType(String type) { try { diff --git a/chunjun-connectors/chunjun-connector-ftp/src/test/java/com/dtstack/chunjun/connector/ftp/format/IFormatConfigTest.java b/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/extend/converter/IColumnConverter.java similarity index 62% rename from chunjun-connectors/chunjun-connector-ftp/src/test/java/com/dtstack/chunjun/connector/ftp/format/IFormatConfigTest.java rename to chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/extend/converter/IColumnConverter.java index 4dd2dc2d1f..782d8f36b2 100644 --- a/chunjun-connectors/chunjun-connector-ftp/src/test/java/com/dtstack/chunjun/connector/ftp/format/IFormatConfigTest.java +++ b/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/extend/converter/IColumnConverter.java @@ -16,24 +16,10 @@ * limitations under the License. */ -package com.dtstack.chunjun.connector.ftp.format; +package com.dtstack.chunjun.connector.ftp.extend.converter; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; +import java.io.Serializable; -public class IFormatConfigTest { - - private IFormatConfig iFormatConfigUnderTest; - - @Before - public void setUp() { - iFormatConfigUnderTest = new IFormatConfig(); - } - - @Test - public void testToString() { - Assert.assertTrue( - iFormatConfigUnderTest.toString().contains(IFormatConfig.class.getSimpleName())); - } +public interface IColumnConverter extends Serializable { + T convert(Object source) throws Exception; } diff --git a/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/client/File.java b/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/extend/ftp/File.java similarity index 98% rename from chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/client/File.java rename to chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/extend/ftp/File.java index 591f30438a..1f5183d0eb 100644 --- a/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/client/File.java +++ b/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/extend/ftp/File.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package com.dtstack.chunjun.connector.ftp.client; +package com.dtstack.chunjun.connector.ftp.extend.ftp; import java.io.Serializable; import java.util.StringJoiner; diff --git a/chunjun-connectors/chunjun-connector-ftp/src/test/java/com/dtstack/chunjun/connector/ftp/client/excel/ExcelSubExceptionCarrierTest.java b/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/extend/ftp/FtpParseException.java similarity index 56% rename from chunjun-connectors/chunjun-connector-ftp/src/test/java/com/dtstack/chunjun/connector/ftp/client/excel/ExcelSubExceptionCarrierTest.java rename to chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/extend/ftp/FtpParseException.java index 786318f37c..8e52400dea 100644 --- a/chunjun-connectors/chunjun-connector-ftp/src/test/java/com/dtstack/chunjun/connector/ftp/client/excel/ExcelSubExceptionCarrierTest.java +++ b/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/extend/ftp/FtpParseException.java @@ -16,25 +16,29 @@ * limitations under the License. */ -package com.dtstack.chunjun.connector.ftp.client.excel; +package com.dtstack.chunjun.connector.ftp.extend.ftp; -import org.junit.Before; -import org.junit.Test; +import java.io.IOException; -public class ExcelSubExceptionCarrierTest { +public class FtpParseException extends IOException { + private String content; - private ExcelSubExceptionCarrier excelSubExceptionCarrierUnderTest; - - @Before - public void setUp() { - excelSubExceptionCarrierUnderTest = new ExcelSubExceptionCarrier(); + /** + * @param content 解析的内容 + * @param errMsg 报错原因 + * @param cause + */ + public FtpParseException(String content, String errMsg, Throwable cause) { + super(errMsg, cause); + this.content = content; } - @Test(expected = RuntimeException.class) - public void setExcelSubExceptionCarrierUnderTest() throws Exception { - RuntimeException test = new RuntimeException("Test"); - excelSubExceptionCarrierUnderTest.setThrowable(test); + public String getContent() { + return content; + } - throw excelSubExceptionCarrierUnderTest.getThrowable(); + @Override + public String toString() { + return "FtpParseException{" + "content='" + content + '\'' + '}'; } } diff --git a/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/format/IFormatConfig.java b/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/extend/ftp/IFormatConfig.java similarity index 73% rename from chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/format/IFormatConfig.java rename to chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/extend/ftp/IFormatConfig.java index 0ed5fe2173..5fba54bc63 100644 --- a/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/format/IFormatConfig.java +++ b/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/extend/ftp/IFormatConfig.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package com.dtstack.chunjun.connector.ftp.format; +package com.dtstack.chunjun.connector.ftp.extend.ftp; import java.io.Serializable; import java.util.Arrays; @@ -27,12 +27,28 @@ public class IFormatConfig implements Serializable { private static final long serialVersionUID = 1L; - private String fieldDelimiter; private String encoding; private String[] fields; private boolean isFirstLineHeader; private Map fileConfig; + private int parallelism; + private long fetchMaxSize; + + /* 列分隔符 */ + private String fieldDelimiter; + + /* 行分隔符 */ + private String columnDelimiter; + + public void setColumnDelimiter(String columnDelimiter) { + this.columnDelimiter = columnDelimiter; + } + + public String getColumnDelimiter() { + return columnDelimiter; + } + public boolean isFirstLineHeader() { return isFirstLineHeader; } @@ -73,14 +89,33 @@ public void setFields(String[] fields) { this.fields = fields; } + public int getParallelism() { + return parallelism; + } + + public void setParallelism(int parallelism) { + this.parallelism = parallelism; + } + + public void setFetchMaxSize(long fetchMaxSize) { + this.fetchMaxSize = fetchMaxSize; + } + + public long getFetchMaxSize() { + return fetchMaxSize; + } + @Override public String toString() { return new StringJoiner(", ", IFormatConfig.class.getSimpleName() + "[", "]") - .add("fieldDelimiter='" + fieldDelimiter + "'") .add("encoding='" + encoding + "'") .add("fields=" + Arrays.toString(fields)) .add("isFirstLineHeader=" + isFirstLineHeader) .add("fileConfig=" + fileConfig) + .add("parallelism=" + parallelism) + .add("fetchMaxSize=" + fetchMaxSize) + .add("fieldDelimiter='" + fieldDelimiter + "'") + .add("columnDelimiter='" + columnDelimiter + "'") .toString(); } } diff --git a/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/handler/IFtpHandler.java b/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/extend/ftp/IFtpHandler.java similarity index 79% rename from chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/handler/IFtpHandler.java rename to chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/extend/ftp/IFtpHandler.java index 71956d631d..81ad023f2c 100644 --- a/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/handler/IFtpHandler.java +++ b/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/extend/ftp/IFtpHandler.java @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -16,32 +16,15 @@ * limitations under the License. */ -package com.dtstack.chunjun.connector.ftp.handler; - -import com.dtstack.chunjun.connector.ftp.conf.FtpConfig; +package com.dtstack.chunjun.connector.ftp.extend.ftp; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.util.List; -/** The skeleton of Ftp Utility class */ public interface IFtpHandler extends AutoCloseable { - /** - * 登录服务器 - * - * @param ftpConfig 连接配置 - */ - void loginFtpServer(FtpConfig ftpConfig); - - /** - * 登出服务器 - * - * @throws IOException logout error - */ - void logoutFtpServer() throws IOException; - /** * 判断给定的目录是否存在 * @@ -56,7 +39,7 @@ public interface IFtpHandler extends AutoCloseable { * @param filePath 要检查的文件路径 * @return true:存在,false:不存在 */ - boolean isFileExist(String filePath) throws IOException; + boolean isFileExist(String filePath); /** * 获取文件输入流 @@ -71,10 +54,18 @@ public interface IFtpHandler extends AutoCloseable { * * @param filePath 文件路径 * @param startPosition 指定的位置 - * @return input stream. + * @return */ InputStream getInputStreamByPosition(String filePath, long startPosition); + /** + * 列出指定路径下的目录 + * + * @param path 路径 + * @return 目录列表 + */ + List listDirs(String path); + /** * 列出指定路径下的目录 * @@ -89,7 +80,7 @@ public interface IFtpHandler extends AutoCloseable { * @param path 路径 * @return 文件列表 */ - List getFiles(String path) throws IOException; + List getFiles(String path); /** * 递归创建目录 @@ -112,14 +103,14 @@ public interface IFtpHandler extends AutoCloseable { * @param dir 指定的目录 * @param exclude 要排除的文件 */ - void deleteAllFilesInDir(String dir, List exclude) throws IOException; + void deleteAllFilesInDir(String dir, List exclude); /** * 删除文件 * * @param filePath 文件路径 */ - void deleteFile(String filePath) throws IOException; + boolean deleteFile(String filePath) throws IOException; /** * 重命名路径 diff --git a/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/extend/ftp/concurrent/ConcurrentFileSplit.java b/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/extend/ftp/concurrent/ConcurrentFileSplit.java new file mode 100644 index 0000000000..64c6aa9ef8 --- /dev/null +++ b/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/extend/ftp/concurrent/ConcurrentFileSplit.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.chunjun.connector.ftp.extend.ftp.concurrent; + +import com.dtstack.chunjun.connector.ftp.extend.ftp.IFormatConfig; +import com.dtstack.chunjun.connector.ftp.extend.ftp.IFtpHandler; + +import java.util.Comparator; +import java.util.List; + +public interface ConcurrentFileSplit { + + /** + * 切割文件, 生成 FtpFileSplit + * + * @param handler + * @param config + * @param files + * @return + */ + List buildFtpFileSplit( + IFtpHandler handler, IFormatConfig config, List files); + + /** + * 断点续传时候需要遍历FtpFileSplit,去除已经读过的文件, 所以需要保证FtpFileSplit的顺序; + * + * @return + */ + Comparator compare(); +} diff --git a/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/source/FtpFileSplit.java b/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/extend/ftp/concurrent/FtpFileSplit.java similarity index 93% rename from chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/source/FtpFileSplit.java rename to chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/extend/ftp/concurrent/FtpFileSplit.java index 8241a0305d..37bb29d1f8 100644 --- a/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/source/FtpFileSplit.java +++ b/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/extend/ftp/concurrent/FtpFileSplit.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package com.dtstack.chunjun.connector.ftp.source; +package com.dtstack.chunjun.connector.ftp.extend.ftp.concurrent; import java.io.Serializable; import java.util.StringJoiner; @@ -26,16 +26,16 @@ public class FtpFileSplit implements Serializable { private static final long serialVersionUID = 1L; /** 这个分片处理文件的开始位置 */ - private long startPosition; + private long startPosition = 0; /** 这个分片处理文件的结束位置 */ - private long endPosition; + private long endPosition = 0; /** 这个分片处理的文件名 */ - private final String filename; + private String filename; /** 文件完整路径 */ - private final String fileAbsolutePath; + private String fileAbsolutePath; /** 压缩类型 */ private String compressType; diff --git a/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/format/IFileReadFormat.java b/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/extend/ftp/format/IFileReadFormat.java similarity index 92% rename from chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/format/IFileReadFormat.java rename to chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/extend/ftp/format/IFileReadFormat.java index 090c8b37a5..47355d3753 100644 --- a/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/format/IFileReadFormat.java +++ b/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/extend/ftp/format/IFileReadFormat.java @@ -16,9 +16,10 @@ * limitations under the License. */ -package com.dtstack.chunjun.connector.ftp.format; +package com.dtstack.chunjun.connector.ftp.extend.ftp.format; -import com.dtstack.chunjun.connector.ftp.client.File; +import com.dtstack.chunjun.connector.ftp.extend.ftp.File; +import com.dtstack.chunjun.connector.ftp.extend.ftp.IFormatConfig; import java.io.Closeable; import java.io.IOException; diff --git a/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/client/excel/ExcelReaderExceptionHandler.java b/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/handler/DTFtpHandler.java similarity index 54% rename from chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/client/excel/ExcelReaderExceptionHandler.java rename to chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/handler/DTFtpHandler.java index c9f1124424..d6871ec900 100644 --- a/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/client/excel/ExcelReaderExceptionHandler.java +++ b/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/handler/DTFtpHandler.java @@ -16,22 +16,33 @@ * limitations under the License. */ -package com.dtstack.chunjun.connector.ftp.client.excel; +package com.dtstack.chunjun.connector.ftp.handler; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import com.dtstack.chunjun.connector.ftp.conf.FtpConfig; +import com.dtstack.chunjun.connector.ftp.extend.ftp.IFtpHandler; -/** @author by dujie @Description @Date 2021/12/20 */ -public class ExcelReaderExceptionHandler implements Thread.UncaughtExceptionHandler { +import java.io.IOException; - protected final Logger LOG = LoggerFactory.getLogger(getClass()); +public interface DTFtpHandler extends IFtpHandler { - @Override - public void uncaughtException(Thread t, Throwable e) { - LOG.error( - "an error occurred during the reading of the Excel file." - + " thread name : {}, message : {}", - t.getName(), - e.getMessage()); - } + /** + * 登录服务器 + * + * @param ftpConfig 连接配置 + */ + void loginFtpServer(FtpConfig ftpConfig); + + /** + * 登出服务器 + * + * @throws IOException logout error + */ + void logoutFtpServer() throws IOException; + + /** + * 关闭ftp输入流 + * + * @throws IOException 文件句柄操作异常 + */ + void completePendingCommand() throws IOException; } diff --git a/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/handler/FtpHandler.java b/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/handler/FtpHandler.java index adda70d0ae..63dacdac9a 100644 --- a/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/handler/FtpHandler.java +++ b/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/handler/FtpHandler.java @@ -22,6 +22,7 @@ import com.dtstack.chunjun.connector.ftp.enums.EFtpMode; import com.dtstack.chunjun.constants.ConstantValue; import com.dtstack.chunjun.throwable.ChunJunRuntimeException; +import com.dtstack.chunjun.util.ExceptionUtil; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.io.IOUtils; @@ -40,11 +41,11 @@ import java.util.ArrayList; import java.util.List; -/** The concrete Ftp Utility class used for standard ftp */ -public class FtpHandler implements IFtpHandler { +public class FtpHandler implements DTFtpHandler { private static final Logger LOG = LoggerFactory.getLogger(FtpHandler.class); + private static final String DISCONNECT_FAIL_MESSAGE = "Failed to disconnect from ftp server"; private static final String SP = "/"; private FTPClient ftpClient = null; private String controlEncoding; @@ -150,7 +151,7 @@ public boolean isDirExist(String directoryPath) { } @Override - public boolean isFileExist(String filePath) throws IOException { + public boolean isFileExist(String filePath) { ftpClient.enterLocalPassiveMode(); InputStream inputStream = null; try { @@ -162,14 +163,19 @@ public boolean isFileExist(String filePath) throws IOException { + filePath); } finally { if (inputStream != null) { - inputStream.close(); - ftpClient.completePendingCommand(); + try { + inputStream.close(); + inputStream = null; + ftpClient.completePendingCommand(); + } catch (IOException e) { + throw new ChunJunRuntimeException("close input stream error."); + } } } } @Override - public List getFiles(String path) throws IOException { + public List getFiles(String path) { List sources = new ArrayList<>(); ftpClient.enterLocalPassiveMode(); @@ -183,7 +189,18 @@ public List getFiles(String path) throws IOException { path = path + SP; } try { - listFiles(path, sources); + FTPFile[] ftpFiles = ftpClient.listFiles(encodePath(path)); + if (ftpFiles != null) { + for (FTPFile ftpFile : ftpFiles) { + // .和..是特殊文件 + if (StringUtils.endsWith(ftpFile.getName(), ConstantValue.POINT_SYMBOL) + || StringUtils.endsWith( + ftpFile.getName(), ConstantValue.TWO_POINT_SYMBOL)) { + continue; + } + sources.addAll(getFiles(path + ftpFile.getName(), ftpFile)); + } + } } catch (IOException e) { LOG.error("", e); throw new RuntimeException(e); @@ -191,28 +208,13 @@ public List getFiles(String path) throws IOException { return sources; } - private void listFiles(String path, List sources) throws IOException { - FTPFile[] ftpFiles = ftpClient.listFiles(encodePath(path)); - if (ftpFiles != null) { - for (FTPFile ftpFile : ftpFiles) { - // .和..是特殊文件 - if (StringUtils.endsWith(ftpFile.getName(), ConstantValue.POINT_SYMBOL) - || StringUtils.endsWith( - ftpFile.getName(), ConstantValue.TWO_POINT_SYMBOL)) { - continue; - } - sources.addAll(getFiles(path + ftpFile.getName(), ftpFile)); - } - } - } - /** * 递归获取指定路径下的所有文件(暂无过滤) * - * @param path 指定路径 - * @param file FTP file - * @return 文件内容 - * @throws IOException io exception + * @param path + * @param file + * @return + * @throws IOException */ private List getFiles(String path, FTPFile file) throws IOException { List sources = new ArrayList<>(); @@ -221,7 +223,17 @@ private List getFiles(String path, FTPFile file) throws IOException { path = path + SP; } ftpClient.enterLocalPassiveMode(); - listFiles(path, sources); + FTPFile[] ftpFiles = ftpClient.listFiles(encodePath(path)); + if (ftpFiles != null) { + for (FTPFile ftpFile : ftpFiles) { + if (StringUtils.endsWith(ftpFile.getName(), ConstantValue.POINT_SYMBOL) + || StringUtils.endsWith( + ftpFile.getName(), ConstantValue.TWO_POINT_SYMBOL)) { + continue; + } + sources.addAll(getFiles(path + ftpFile.getName(), ftpFile)); + } + } } else { sources.add(path); } @@ -310,7 +322,7 @@ private void printWorkingDirectory() { } @Override - public void deleteAllFilesInDir(String dir, List exclude) throws IOException { + public void deleteAllFilesInDir(String dir, List exclude) { if (isDirExist(dir)) { if (!dir.endsWith(SP)) { dir = dir + SP; @@ -351,21 +363,23 @@ public void deleteAllFilesInDir(String dir, List exclude) throws IOExcep } @Override - public void deleteFile(String filePath) throws IOException { + public boolean deleteFile(String filePath) throws IOException { try { if (isFileExist(filePath)) { - ftpClient.deleteFile(filePath); + return ftpClient.deleteFile(filePath); } } catch (IOException e) { throw new IOException(e); } + return true; } @Override public InputStream getInputStream(String filePath) { try { ftpClient.enterLocalPassiveMode(); - return ftpClient.retrieveFileStream(encodePath(filePath)); + InputStream is = ftpClient.retrieveFileStream(encodePath(filePath)); + return is; } catch (IOException e) { String message = String.format("读取文件 : [%s] 时出错,请确认文件:[%s]存在且配置的用户有权限读取", filePath, filePath); @@ -376,20 +390,66 @@ public InputStream getInputStream(String filePath) { @Override public InputStream getInputStreamByPosition(String filePath, long startPosition) { - throw new RuntimeException("ftp协议,暂不支持从指定位置读取文件"); + if (startPosition != 0) { + throw new RuntimeException("ftp协议,暂不支持从指定位置读取文件"); + } + + return getInputStream(filePath); + } + + @Override + public List listDirs(String path) { + List sources = new ArrayList<>(); + if (isDirExist(path)) { + if (!path.endsWith(SP)) { + path = path + SP; + } + + try { + FTPFile[] ftpFiles = ftpClient.listFiles(encodePath(path)); + if (ftpFiles != null) { + for (FTPFile ftpFile : ftpFiles) { + if (StringUtils.endsWith(ftpFile.getName(), ConstantValue.POINT_SYMBOL) + || StringUtils.endsWith( + ftpFile.getName(), ConstantValue.TWO_POINT_SYMBOL)) { + continue; + } + sources.add(path + ftpFile.getName()); + } + } + } catch (IOException e) { + LOG.error("", e); + throw new RuntimeException(e); + } + } + + return sources; } @Override public void rename(String oldPath, String newPath) throws IOException { - /* 兼容windows, 如果目标文件存在, rename会报错 */ + /** 兼容windows, 如果目标文件存在, rename会报错 */ if (this.isFileExist(newPath)) { LOG.info(String.format("[%s] exist, delete it before rename", newPath)); - deleteFile(newPath); + this.deleteFile(newPath); } ftpClient.rename(encodePath(oldPath), encodePath(newPath)); } + @Override + public void completePendingCommand() throws IOException { + try { + // throw exception when return false + if (!ftpClient.completePendingCommand()) { + throw new IOException("I/O error occurs while sending or receiving data"); + } + } catch (IOException e) { + LOG.error("I/O error occurs while sending or receiving data"); + throw new IOException(ExceptionUtil.getErrorMessage(e)); + } + } + @Override public long getFileSize(String path) throws IOException { reconnectFtp(); @@ -443,6 +503,6 @@ public void reconnectFtp() { @Override public void close() throws Exception { - logoutFtpServer(); + this.logoutFtpServer(); } } diff --git a/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/handler/FtpHandlerFactory.java b/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/handler/FtpHandlerFactory.java index ad5e80323a..f6fe06cde7 100644 --- a/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/handler/FtpHandlerFactory.java +++ b/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/handler/FtpHandlerFactory.java @@ -20,14 +20,10 @@ import org.apache.commons.lang3.StringUtils; -/** - * @author jiangbo - * @date 2019/11/21 - */ public class FtpHandlerFactory { - public static IFtpHandler createFtpHandler(String protocolStr) { - IFtpHandler ftpHandler; + public static DTFtpHandler createFtpHandler(String protocolStr) { + DTFtpHandler ftpHandler; Protocol protocol = Protocol.getByName(protocolStr); if (Protocol.SFTP.equals(protocol)) { diff --git a/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/handler/Position.java b/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/handler/Position.java index 4d1aa30b19..3c8ec0f41b 100644 --- a/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/handler/Position.java +++ b/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/handler/Position.java @@ -18,7 +18,7 @@ package com.dtstack.chunjun.connector.ftp.handler; -import com.dtstack.chunjun.connector.ftp.source.FtpFileSplit; +import com.dtstack.chunjun.connector.ftp.extend.ftp.concurrent.FtpFileSplit; import java.io.Serializable; import java.util.StringJoiner; @@ -27,10 +27,10 @@ public class Position implements Serializable { private static final long serialVersionUID = 1L; - /** 第几条数据* */ - private final Long currentReadPosition; - /** 读取的数据文件* */ - private final FtpFileSplit fileSplit; + /** 当前的文件偏移量 */ + private Long currentReadPosition; + /** 读取的数据文件 */ + private FtpFileSplit fileSplit; public Position(Long currentReadPosition, FtpFileSplit fileSplit) { this.currentReadPosition = currentReadPosition; diff --git a/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/handler/SftpHandler.java b/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/handler/SftpHandler.java index 4082aeaf41..f233862d0e 100644 --- a/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/handler/SftpHandler.java +++ b/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/handler/SftpHandler.java @@ -42,16 +42,15 @@ import java.util.Properties; import java.util.Vector; -/** The concrete Ftp Utility class used for sftp */ -public class SftpHandler implements IFtpHandler { +public class SftpHandler implements DTFtpHandler { private static final Logger LOG = LoggerFactory.getLogger(SftpHandler.class); private static final String DOT = "."; private static final String DOT_DOT = ".."; private static final String SP = "/"; private static final String SRC_MAIN = "src/main"; - private static final String[] PATH_NOT_EXIST_ERR = {"no such file", "is not a valid file path"}; - private static final String MSG_AUTH_FAIL = "Auth fail"; + private static String[] PATH_NOT_EXIST_ERR = {"no such file", "is not a valid file path"}; + private static String MSG_AUTH_FAIL = "Auth fail"; private Session session = null; private ChannelSftp channelSftp = null; @@ -225,6 +224,42 @@ public InputStream getInputStreamByPosition(String filePath, long startPosition) } } + @Override + public List listDirs(String path) { + if (StringUtils.isBlank(path)) { + path = SP; + } + + List dirs = new ArrayList<>(); + if (isDirExist(path)) { + if (path.equals(DOT) || path.equals(SRC_MAIN)) { + return dirs; + } + + if (!path.endsWith(SP)) { + path = path + SP; + } + + try { + Vector vector = channelSftp.ls(path); + for (int i = 0; i < vector.size(); ++i) { + ChannelSftp.LsEntry le = (ChannelSftp.LsEntry) vector.get(i); + String strName = le.getFilename(); + if (!strName.equals(DOT) + && !strName.equals(SRC_MAIN) + && !strName.equals(DOT_DOT)) { + String filePath = path + strName; + dirs.add(filePath); + } + } + } catch (SftpException e) { + LOG.error("", e); + } + } + + return dirs; + } + @Override public List getFiles(String path) { if (StringUtils.isBlank(path)) { @@ -239,9 +274,9 @@ public List getFiles(String path) { path = path + SP; } try { - Vector vector = channelSftp.ls(path); - for (Object o : vector) { - ChannelSftp.LsEntry le = (ChannelSftp.LsEntry) o; + Vector vector = channelSftp.ls(path); + for (int i = 0; i < vector.size(); ++i) { + ChannelSftp.LsEntry le = (ChannelSftp.LsEntry) vector.get(i); String strName = le.getFilename(); if (!strName.equals(DOT) && !strName.equals(SRC_MAIN) @@ -266,7 +301,7 @@ public List getFiles(String path) { public void mkDirRecursive(String directoryPath) { boolean isDirExist = false; try { - printWorkingDirectory(); + this.printWorkingDirectory(); SftpATTRS sftpAttrs = this.channelSftp.lstat(directoryPath); isDirExist = sftpAttrs.isDir(); } catch (SftpException e) { @@ -276,6 +311,7 @@ public void mkDirRecursive(String directoryPath) { String.format( "file not found, sftp server reply errorCode: [%d]", e.id); LOG.warn(message); + isDirExist = false; } } } @@ -338,9 +374,9 @@ public void deleteAllFilesInDir(String dir, List exclude) { } try { - Vector vector = channelSftp.ls(dir); - for (Object o : vector) { - ChannelSftp.LsEntry le = (ChannelSftp.LsEntry) o; + Vector vector = channelSftp.ls(dir); + for (int i = 0; i < vector.size(); ++i) { + ChannelSftp.LsEntry le = (ChannelSftp.LsEntry) vector.get(i); String strName = le.getFilename(); if (CollectionUtils.isNotEmpty(exclude) && exclude.contains(strName)) { continue; @@ -372,7 +408,7 @@ public void deleteAllFilesInDir(String dir, List exclude) { } @Override - public void deleteFile(String filePath) throws IOException { + public boolean deleteFile(String filePath) throws IOException { try { if (isFileExist(filePath)) { channelSftp.rm(filePath); @@ -380,36 +416,51 @@ public void deleteFile(String filePath) throws IOException { } catch (SftpException e) { throw new IOException(e); } + return true; } - public void mkDirSingleHierarchy(String directoryPath) throws SftpException { - boolean isDirExist; + public boolean mkDirSingleHierarchy(String directoryPath) throws SftpException { + boolean isDirExist = false; try { SftpATTRS sftpAttrs = this.channelSftp.lstat(directoryPath); isDirExist = sftpAttrs.isDir(); } catch (SftpException e) { - LOG.info("Creating a directory step by step [{}]", directoryPath); - this.channelSftp.mkdir(directoryPath); - return; + if (!isDirExist) { + LOG.info("Creating a directory step by step [{}]", directoryPath); + this.channelSftp.mkdir(directoryPath); + return true; + } } if (!isDirExist) { LOG.info("Creating a directory step by step [{}]", directoryPath); this.channelSftp.mkdir(directoryPath); } + return true; } @Override public void rename(String oldPath, String newPath) throws SftpException { + /** 兼容连通支付ftp数据源,如果目标文件存在会报错, 重命名前,先删除 */ + boolean deleteSuccess = true; if (this.isFileExist(newPath)) { try { LOG.info(String.format("[%s] exist, delete it before rename", newPath)); this.deleteFile(newPath); } catch (Exception e) { + deleteSuccess = false; throw new ChunJunRuntimeException(e); } } - channelSftp.rename(oldPath, newPath); + if (deleteSuccess) { + channelSftp.rename(oldPath, newPath); + } + } + + /** 仅ftp输入流需要显示关闭 */ + @Override + public void completePendingCommand() { + this.logoutFtpServer(); } @Override @@ -423,6 +474,6 @@ public long getFileSize(String path) throws IOException { @Override public void close() throws Exception { - logoutFtpServer(); + this.logoutFtpServer(); } } diff --git a/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/format/CsvFileFormat.java b/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/iformat/CsvFileFormat.java similarity index 91% rename from chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/format/CsvFileFormat.java rename to chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/iformat/CsvFileFormat.java index 594ac1f94b..a99013364e 100644 --- a/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/format/CsvFileFormat.java +++ b/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/iformat/CsvFileFormat.java @@ -16,9 +16,11 @@ * limitations under the License. */ -package com.dtstack.chunjun.connector.ftp.format; +package com.dtstack.chunjun.connector.ftp.iformat; -import com.dtstack.chunjun.connector.ftp.client.File; +import com.dtstack.chunjun.connector.ftp.extend.ftp.File; +import com.dtstack.chunjun.connector.ftp.extend.ftp.IFormatConfig; +import com.dtstack.chunjun.connector.ftp.extend.ftp.format.IFileReadFormat; import com.csvreader.CsvReader; import org.apache.commons.collections.MapUtils; diff --git a/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/format/ExcelFileFormat.java b/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/iformat/ExcelFileFormat.java similarity index 91% rename from chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/format/ExcelFileFormat.java rename to chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/iformat/ExcelFileFormat.java index 3ee3b8286e..d69b204684 100644 --- a/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/format/ExcelFileFormat.java +++ b/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/iformat/ExcelFileFormat.java @@ -16,13 +16,15 @@ * limitations under the License. */ -package com.dtstack.chunjun.connector.ftp.format; +package com.dtstack.chunjun.connector.ftp.iformat; -import com.dtstack.chunjun.connector.ftp.client.File; import com.dtstack.chunjun.connector.ftp.client.excel.ExcelReadListener; import com.dtstack.chunjun.connector.ftp.client.excel.ExcelReaderExecutor; import com.dtstack.chunjun.connector.ftp.client.excel.ExcelSubExceptionCarrier; import com.dtstack.chunjun.connector.ftp.client.excel.Row; +import com.dtstack.chunjun.connector.ftp.extend.ftp.File; +import com.dtstack.chunjun.connector.ftp.extend.ftp.IFormatConfig; +import com.dtstack.chunjun.connector.ftp.extend.ftp.format.IFileReadFormat; import com.alibaba.excel.EasyExcel; import com.alibaba.excel.ExcelReader; @@ -55,7 +57,7 @@ public class ExcelFileFormat implements IFileReadFormat { private Row row; @Override - public void open(File file, InputStream inputStream, IFormatConfig config) { + public void open(File file, InputStream inputStream, IFormatConfig config) throws IOException { LOG.info("open file : {}", file.getFileName()); this.cellCount = config.getFields().length; ExcelReadListener listener = new ExcelReadListener(); @@ -110,7 +112,7 @@ public boolean hasNext() throws IOException { } @Override - public String[] nextRecord() { + public String[] nextRecord() throws IOException { String[] data; if (row.isEnd()) { try { @@ -146,16 +148,16 @@ public void close() throws IOException { } private String[] formatValue(String[] data) { - String[] record = initDataContainer(cellCount); + String[] record = initDataContainer(cellCount, ""); // because cellCount is always >= data.length System.arraycopy(data, 0, record, 0, data.length); return record; } - private String[] initDataContainer(int capacity) { + private String[] initDataContainer(int capacity, String defValue) { String[] container = new String[capacity]; for (int i = 0; i < capacity; i++) { - container[i] = ""; + container[i] = defValue; } return container; } diff --git a/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/format/IFormatFactory.java b/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/iformat/IFormatFactory.java similarity index 91% rename from chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/format/IFormatFactory.java rename to chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/iformat/IFormatFactory.java index 69d9120128..0917e07fbe 100644 --- a/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/format/IFormatFactory.java +++ b/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/iformat/IFormatFactory.java @@ -16,9 +16,10 @@ * limitations under the License. */ -package com.dtstack.chunjun.connector.ftp.format; +package com.dtstack.chunjun.connector.ftp.iformat; import com.dtstack.chunjun.connector.ftp.enums.FileType; +import com.dtstack.chunjun.connector.ftp.extend.ftp.format.IFileReadFormat; public class IFormatFactory { @@ -36,7 +37,7 @@ public static IFileReadFormat create(FileType fileType, String className) { } public static IFileReadFormat createCustom(String className) { - Class userClass; + Class userClass = null; try { ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); userClass = classLoader.loadClass(className); diff --git a/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/format/TextFileFormat.java b/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/iformat/TextFileFormat.java similarity index 86% rename from chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/format/TextFileFormat.java rename to chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/iformat/TextFileFormat.java index fb22c4a674..86c12c8a5d 100644 --- a/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/format/TextFileFormat.java +++ b/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/iformat/TextFileFormat.java @@ -16,9 +16,11 @@ * limitations under the License. */ -package com.dtstack.chunjun.connector.ftp.format; +package com.dtstack.chunjun.connector.ftp.iformat; -import com.dtstack.chunjun.connector.ftp.client.File; +import com.dtstack.chunjun.connector.ftp.extend.ftp.File; +import com.dtstack.chunjun.connector.ftp.extend.ftp.IFormatConfig; +import com.dtstack.chunjun.connector.ftp.extend.ftp.format.IFileReadFormat; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; @@ -52,7 +54,7 @@ public boolean hasNext() throws IOException { } @Override - public String[] nextRecord() { + public String[] nextRecord() throws IOException { return StringUtils.splitByWholeSeparatorPreserveAllTokens(line, filedDelimiter); } diff --git a/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/options/FtpOptions.java b/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/options/FtpOptions.java index 83d358c53d..dbd5d8c550 100644 --- a/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/options/FtpOptions.java +++ b/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/options/FtpOptions.java @@ -23,11 +23,6 @@ import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; -/** - * @program chunjun - * @author: xiuzhu - * @create: 2021/06/19 - */ public class FtpOptions extends BaseFileOptions { public static final ConfigOption FORMAT = @@ -77,4 +72,16 @@ public class FtpOptions extends BaseFileOptions { .stringType() .defaultValue("PASV") .withDescription("ftp connectPattern"); + + public static final ConfigOption isFirstLineHeader = + ConfigOptions.key("isFirstLineHeader") + .booleanType() + .defaultValue(true) + .withDescription("FirstLineHeader"); + + public static final ConfigOption FILE_TYPE = + ConfigOptions.key("fileType") + .stringType() + .defaultValue("TXT") + .withDescription("FILE_TYPE"); } diff --git a/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/sink/FtpDynamicTableSink.java b/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/sink/FtpDynamicTableSink.java index 681af45477..4be0f775bc 100644 --- a/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/sink/FtpDynamicTableSink.java +++ b/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/sink/FtpDynamicTableSink.java @@ -30,11 +30,6 @@ import org.apache.flink.table.connector.sink.SinkFunctionProvider; import org.apache.flink.table.data.RowData; -/** - * @program chunjun - * @author: xiuzhu - * @create: 2021/06/19 - */ public class FtpDynamicTableSink implements DynamicTableSink { private final TableSchema physicalSchema; diff --git a/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/sink/FtpOutputFormat.java b/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/sink/FtpOutputFormat.java index 5a8398e8c0..9317cc55c5 100644 --- a/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/sink/FtpOutputFormat.java +++ b/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/sink/FtpOutputFormat.java @@ -19,8 +19,8 @@ package com.dtstack.chunjun.connector.ftp.sink; import com.dtstack.chunjun.connector.ftp.conf.FtpConfig; +import com.dtstack.chunjun.connector.ftp.handler.DTFtpHandler; import com.dtstack.chunjun.connector.ftp.handler.FtpHandlerFactory; -import com.dtstack.chunjun.connector.ftp.handler.IFtpHandler; import com.dtstack.chunjun.constants.ConstantValue; import com.dtstack.chunjun.enums.SizeUnitType; import com.dtstack.chunjun.sink.WriteMode; @@ -31,6 +31,8 @@ import org.apache.flink.table.data.RowData; +import org.apache.commons.lang3.StringUtils; + import java.io.BufferedWriter; import java.io.File; import java.io.IOException; @@ -39,8 +41,8 @@ import java.util.ArrayList; import java.util.List; import java.util.UUID; +import java.util.function.Function; -/** The OutputFormat Implementation which reads data from ftp servers. */ public class FtpOutputFormat extends BaseFileOutputFormat { /** 换行符 */ @@ -48,7 +50,7 @@ public class FtpOutputFormat extends BaseFileOutputFormat { protected FtpConfig ftpConfig; - private transient IFtpHandler ftpHandler; + private transient DTFtpHandler ftpHandler; private transient BufferedWriter writer; @@ -66,7 +68,7 @@ protected void openSource() {} @Override protected void checkOutputDir() { - wrapFtpHandler( + wrapFtpAction( iFtpHandler -> { if (iFtpHandler.isDirExist(tmpPath)) { if (iFtpHandler.isFileExist(tmpPath)) { @@ -76,17 +78,26 @@ protected void checkOutputDir() { } else { iFtpHandler.mkDirRecursive(tmpPath); } + return null; }); } @Override protected void deleteDataDir() { - wrapFtpHandler(iFtpHandler -> iFtpHandler.deleteAllFilesInDir(outputFilePath, null)); + wrapFtpAction( + iFtpHandler -> { + iFtpHandler.deleteAllFilesInDir(outputFilePath, null); + return null; + }); } @Override protected void deleteTmpDataDir() { - wrapFtpHandler(iFtpHandler -> iFtpHandler.deleteAllFilesInDir(tmpPath, null)); + wrapFtpAction( + iFtpHandler -> { + iFtpHandler.deleteAllFilesInDir(tmpPath, null); + return null; + }); } @Override @@ -105,6 +116,7 @@ protected void nextBlock() { } catch (IOException e) { throw new ChunJunRuntimeException(ExceptionUtil.getErrorMessage(e)); } + currentFileIndex++; } @@ -132,12 +144,12 @@ protected void checkCurrentFileSize() { } @Override - @SuppressWarnings("unchecked") public void writeSingleRecordToFile(RowData rowData) throws WriteRecordException { try { if (writer == null) { nextBlock(); } + String line = (String) rowConverter.toExternal(rowData, ""); this.writer.write(line); this.writer.write(NEWLINE); @@ -158,10 +170,15 @@ public void closeInternal() throws IOException { os.close(); os = null; } + this.ftpHandler.logoutFtpServer(); } catch (Exception e) { throw new ChunJunRuntimeException("can't close source.", e); } finally { - this.ftpHandler.logoutFtpServer(); + try { + this.ftpHandler.logoutFtpServer(); + } catch (IOException e) { + throw new ChunJunRuntimeException("can't logout ftp client.", e); + } } } @@ -237,8 +254,8 @@ protected void deleteDataFiles(List preCommitFilePathList, String path) @Override protected void moveAllTmpDataFileToDir() { - wrapFtpHandler( - handler -> { + wrapFtpAction( + (DTFtpHandler handler) -> { String dataFilePath = ""; try { List dataFiles = handler.getFiles(tmpPath); @@ -267,6 +284,7 @@ protected void moveAllTmpDataFileToDir() { dataFilePath, outputFilePath), e); } + return null; }); } @@ -292,7 +310,7 @@ protected long getCurrentFileSize() { @Override protected void writeMultipleRecordsInternal() { - throw new UnsupportedOperationException("FtpWriter don't support write batch data."); + notSupportBatchWrite("FtpWriter"); } public FtpConfig getFtpConfig() { @@ -303,23 +321,26 @@ public void setFtpConfig(FtpConfig ftpConfig) { this.ftpConfig = ftpConfig; } + protected void notSupportBatchWrite(String writerName) { + throw new UnsupportedOperationException(writerName + "不支持批量写入"); + } + private String handleUserSpecificFileName( - String tmpDataFileName, int fileNumber, List copyList, IFtpHandler handler) - throws IOException { + String tmpDataFileName, int fileNumber, List copyList, DTFtpHandler handler) { String fileName = ftpConfig.getFtpFileName(); - if (org.apache.commons.lang.StringUtils.isNotBlank(fileName)) { + if (StringUtils.isNotBlank(fileName)) { if (fileNumber == 1) { - fileName = handlerSingleFile(); + fileName = handlerSingleFile(tmpDataFileName); } else { fileName = handlerMultiChannel(tmpDataFileName); } } else { fileName = tmpDataFileName; } - return checkFilePath(fileName, copyList, handler); + return filepathCheck(fileName, copyList, handler); } - private String handlerSingleFile() { + private String handlerSingleFile(String tmpDataFileName) { return ftpConfig.getFtpFileName(); } @@ -344,8 +365,7 @@ private String handlerMultiChannel(String tmpDataFileName) { return fileName; } - private String checkFilePath(String filename, List copyList, IFtpHandler handler) - throws IOException { + private String filepathCheck(String filename, List copyList, DTFtpHandler handler) { if (WriteMode.NONCONFLICT.name().equalsIgnoreCase(ftpConfig.getWriteMode())) { if (handler.isFileExist(ftpConfig.getPath() + File.separatorChar + filename)) { handler.deleteAllFilesInDir(tmpPath, null); @@ -365,7 +385,7 @@ private String checkFilePath(String filename, List copyList, IFtpHandler } else if (WriteMode.INSERT.name().equalsIgnoreCase(ftpConfig.getWriteMode())) { if (handler.isFileExist(ftpConfig.getPath() + File.separatorChar + filename)) { String suffix = - org.apache.commons.lang.StringUtils.isNotBlank(ftpConfig.getSuffix()) + StringUtils.isNotBlank(ftpConfig.getSuffix()) ? "_" + ftpConfig.getSuffix() : "_" + UUID.randomUUID(); StringBuilder sb = new StringBuilder(filename); @@ -381,18 +401,20 @@ private String checkFilePath(String filename, List copyList, IFtpHandler return filename; } - private void wrapFtpHandler(Callback callback) { - try (IFtpHandler tmpFtpHandler = - FtpHandlerFactory.createFtpHandler(ftpConfig.getProtocol())) { - tmpFtpHandler.loginFtpServer(ftpConfig); - callback.apply(tmpFtpHandler); + private void wrapFtpAction(Function function) { + DTFtpHandler tmpFtpHandler = FtpHandlerFactory.createFtpHandler(ftpConfig.getProtocol()); + tmpFtpHandler.loginFtpServer(ftpConfig); + + try { + function.apply(tmpFtpHandler); } catch (Exception e) { throw new ChunJunRuntimeException(e); + } finally { + try { + tmpFtpHandler.logoutFtpServer(); + } catch (Exception e1) { + throw new ChunJunRuntimeException(e1); + } } } - - @FunctionalInterface - interface Callback { - void apply(T t) throws IOException; - } } diff --git a/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/sink/FtpOutputFormatBuilder.java b/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/sink/FtpOutputFormatBuilder.java index 8337bbebc3..d1b6bf829c 100644 --- a/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/sink/FtpOutputFormatBuilder.java +++ b/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/sink/FtpOutputFormatBuilder.java @@ -24,14 +24,9 @@ import org.apache.commons.lang3.StringUtils; -/** - * The builder of FtpOutputFormat - * - *

Company: www.dtstack.com - * - * @author huyifan.zju@163.com - */ -public class FtpOutputFormatBuilder extends FileOutputFormatBuilder { +public class FtpOutputFormatBuilder extends FileOutputFormatBuilder { + + private FtpOutputFormat format; public FtpOutputFormatBuilder() { super(new FtpOutputFormat()); @@ -49,7 +44,7 @@ protected void checkFormat() { throw new ChunJunRuntimeException("Please Set protocol"); } if (StringUtils.isBlank(ftpConfig.getHost())) { - throw new ChunJunRuntimeException("Please Set gost"); + throw new ChunJunRuntimeException("Please Set host"); } if (StringUtils.isBlank(ftpConfig.getPath())) { throw new ChunJunRuntimeException("Please Set path"); diff --git a/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/sink/FtpSinkFactory.java b/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/sink/FtpSinkFactory.java index ed88c8adb8..a9d5748b0b 100644 --- a/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/sink/FtpSinkFactory.java +++ b/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/sink/FtpSinkFactory.java @@ -38,11 +38,6 @@ import java.util.List; import java.util.stream.Collectors; -/** - * @program chunjun - * @author: xiuzhu - * @create: 2021/06/19 - */ public class FtpSinkFactory extends SinkFactory { private List columnName; diff --git a/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/source/FtpDynamicTableSource.java b/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/source/FtpDynamicTableSource.java index 5edaf84d38..2fb4f373db 100644 --- a/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/source/FtpDynamicTableSource.java +++ b/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/source/FtpDynamicTableSource.java @@ -34,11 +34,6 @@ import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; import org.apache.flink.table.types.logical.RowType; -/** - * @program: Chunjun - * @author: xiuzhu - * @create: 2021/06/19 - */ public class FtpDynamicTableSource implements ScanTableSource { private TableSchema schema; diff --git a/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/source/FtpFileReader.java b/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/source/FtpFileReader.java index 8e2adc0d51..2fc28036b2 100644 --- a/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/source/FtpFileReader.java +++ b/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/source/FtpFileReader.java @@ -19,16 +19,23 @@ package com.dtstack.chunjun.connector.ftp.source; import com.dtstack.chunjun.connector.ftp.client.Data; -import com.dtstack.chunjun.connector.ftp.client.File; -import com.dtstack.chunjun.connector.ftp.client.FileUtil; import com.dtstack.chunjun.connector.ftp.client.ZipInputStream; import com.dtstack.chunjun.connector.ftp.conf.FtpConfig; import com.dtstack.chunjun.connector.ftp.enums.FileType; -import com.dtstack.chunjun.connector.ftp.format.IFileReadFormat; -import com.dtstack.chunjun.connector.ftp.format.IFormatConfig; -import com.dtstack.chunjun.connector.ftp.format.IFormatFactory; -import com.dtstack.chunjun.connector.ftp.handler.IFtpHandler; +import com.dtstack.chunjun.connector.ftp.extend.ftp.File; +import com.dtstack.chunjun.connector.ftp.extend.ftp.FtpParseException; +import com.dtstack.chunjun.connector.ftp.extend.ftp.IFormatConfig; +import com.dtstack.chunjun.connector.ftp.extend.ftp.concurrent.FtpFileSplit; +import com.dtstack.chunjun.connector.ftp.extend.ftp.format.IFileReadFormat; +import com.dtstack.chunjun.connector.ftp.handler.DTFtpHandler; +import com.dtstack.chunjun.connector.ftp.handler.FtpHandler; +import com.dtstack.chunjun.connector.ftp.handler.FtpHandlerFactory; import com.dtstack.chunjun.connector.ftp.handler.Position; +import com.dtstack.chunjun.connector.ftp.iformat.IFormatFactory; +import com.dtstack.chunjun.metrics.BaseMetric; + +import org.apache.flink.api.common.accumulators.LongCounter; +import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; @@ -36,8 +43,6 @@ import java.io.IOException; import java.io.InputStream; -import java.nio.charset.Charset; -import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -45,27 +50,31 @@ import java.util.Locale; import java.util.Map; +import static com.dtstack.chunjun.connector.ftp.conf.ConfigConstants.FTP_COUNTER_PREFIX; + public class FtpFileReader { private static final Logger LOG = LoggerFactory.getLogger(FtpFileReader.class); - private final IFtpHandler ftpHandler; + private DTFtpHandler ftpHandler; private Iterator iter; private int fromLine = 0; /** the fileSplit current read * */ private FtpFileSplit currentFileSplit; - /** The bytes read from the current fileSplit * */ - private Long currentFileSplitReadBytes = 0L; - private Position startPosition; private final FtpConfig ftpConfig; private IFormatConfig iFormatConfig; private IFileReadFormat currentFileReadFormat; private final Map iFileReadFormatCache; + private RuntimeContext runtimeContext; + private Map counterMap; + private BaseMetric baseMetric; + private boolean enableMetric = false; + private FtpInputStreamProxy currentInputStream; public FtpFileReader( - IFtpHandler ftpHandler, + DTFtpHandler ftpHandler, Iterator iter, FtpConfig ftpConfig, Position startPosition) { @@ -79,20 +88,19 @@ public FtpFileReader( /** 断点续跑,过滤文件已经读过的部分 * */ public void skipHasReadFiles() { if (startPosition != null && startPosition.getCurrentReadPosition() > 0) { - FtpFileSplit storeFileSplit = startPosition.getFileSplit(); + FtpFileSplit storefs = startPosition.getFileSplit(); - /* + /** * remove same file name but endPosition < startPosition.getCurrentReadPosition() set * FtpFileSplit startPosition = startPosition.getCurrentReadPosition() */ ArrayList fileCache = new ArrayList<>(); for (Iterator it = iter; it.hasNext(); ) { FtpFileSplit fs = it.next(); - if (fs.getFileAbsolutePath().equals(storeFileSplit.getFileAbsolutePath())) { + if (fs.getFileAbsolutePath().equals(storefs.getFileAbsolutePath())) { if (fs.getEndPosition() <= startPosition.getCurrentReadPosition()) { // remove same file name but endPosition < // startPosition.getCurrentReadPosition() - // do nothing } else { fs.setStartPosition(startPosition.getCurrentReadPosition()); fileCache.add(fs); @@ -112,23 +120,50 @@ public Data readLine() throws IOException { } if (currentFileReadFormat != null) { - if (currentFileSplitReadBytes >= currentFileSplit.getReadLimit()) { - close(); - return readLine(); - } - if (!currentFileReadFormat.hasNext()) { close(); return readLine(); } - String[] record = currentFileReadFormat.nextRecord(); - addCurrentReadSize(record); + try { + String[] record = currentFileReadFormat.nextRecord(); + if (enableMetric) { + String readBytesMetricName = getMetricName("readBytes"); + LongCounter readBytesCounter = counterMap.get(readBytesMetricName); + setMetricValue(readBytesCounter, currentInputStream.getCurrentReadBytes()); - return new Data(record, new Position(currentFileSplitReadBytes, currentFileSplit)); - } + String metricName = getMetricName("readLines"); + LongCounter counter = counterMap.get(metricName); + updateMetric(counter, 1); + } + + return new Data( + record, + new Position( + currentInputStream.getCurrentReadBytes() + + currentFileSplit.getStartPosition(), + currentFileSplit)); + + } catch (FtpParseException e) { + String[] record = new String[] {e.getContent()}; + return new Data( + record, + new Position( + currentInputStream.getCurrentReadBytes() + + currentFileSplit.getStartPosition(), + currentFileSplit), + e); + } - return null; + } else { + if (enableMetric) { + // tick end + String metricName = getMetricName("tickEnd"); + LongCounter counter = counterMap.get(metricName); + tickEndMetric(counter); + } + return null; + } } private void nextStream() throws IOException { @@ -164,6 +199,8 @@ private void nextStream() throws IOException { iFileReadFormatCache.put(fileType, iFileReadFormat); } currentFileReadFormat = iFileReadFormatCache.get(fileType); + currentFileSplit = fileSplit; + currentInputStream = new FtpInputStreamProxy(in, currentFileSplit.getReadLimit()); // adapt to previous file parameter File file = @@ -172,10 +209,7 @@ private void nextStream() throws IOException { fileSplit.getFileAbsolutePath(), fileSplit.getFilename(), fileSplit.getCompressType()); - currentFileReadFormat.open(file, in, iFormatConfig); - - currentFileSplit = fileSplit; - currentFileSplitReadBytes = 0L; + currentFileReadFormat.open(file, currentInputStream, iFormatConfig); if (fileSplit.getStartPosition() == 0) { if (fileType != FileType.EXCEL) { @@ -183,7 +217,6 @@ private void nextStream() throws IOException { if (currentFileReadFormat.hasNext()) { String[] strings = currentFileReadFormat.nextRecord(); LOG.info("Skip line:{}", Arrays.toString(strings)); - addCurrentReadSize(strings); } else { break; } @@ -196,11 +229,30 @@ private void nextStream() throws IOException { } public void close() throws IOException { + + if (currentInputStream != null) { + currentInputStream.close(); + } + if (currentFileReadFormat != null) { currentFileReadFormat.close(); currentFileReadFormat = null; - FileUtil.closeWithFtpHandler(ftpHandler, LOG); + if (ftpHandler instanceof FtpHandler) { + try { + ((FtpHandler) ftpHandler).getFtpClient().completePendingCommand(); + } catch (Exception e) { + // 如果出现了超时异常,就直接获取一个新的ftpHandler + LOG.warn("FTPClient completePendingCommand has error ->", e); + try { + ftpHandler.logoutFtpServer(); + } catch (Exception exception) { + LOG.warn("FTPClient logout has error ->", exception); + } + ftpHandler = FtpHandlerFactory.createFtpHandler(ftpConfig.getProtocol()); + ftpHandler.loginFtpServer(ftpConfig); + } + } } } @@ -243,23 +295,102 @@ public String getCurrentFileName() { return currentFileSplit.getFileAbsolutePath(); } - public void setIFormatConfig(IFormatConfig iFormatConfig) { + public IFormatConfig getiFormatConfig() { + return iFormatConfig; + } + + public void setiFormatConfig(IFormatConfig iFormatConfig) { this.iFormatConfig = iFormatConfig; } - private void addCurrentReadSize(String[] value) { - String line = String.join(ftpConfig.getFieldDelimiter(), value); - currentFileSplitReadBytes += line.getBytes(getCharacterSet()).length; - currentFileSplitReadBytes += "\n".getBytes(getCharacterSet()).length; + public void enableMetric(RuntimeContext runtimeContext, BaseMetric baseMetric) { + this.runtimeContext = runtimeContext; + this.baseMetric = baseMetric; + counterMap = new HashMap<>(); + this.enableMetric = true; + + ArrayList fileCache = new ArrayList<>(); + + int totalFiles = 0; + StringBuilder allFileNamesBuilder = new StringBuilder(); + allFileNamesBuilder.append("ftp_read_files("); + + for (Iterator it = iter; it.hasNext(); ) { + FtpFileSplit file = it.next(); + String readLinesMetricName = getMetricName("readLines"); + String readBytesMetricName = getMetricName("readBytes"); + + counterMap.put(readLinesMetricName, registerMetric(readLinesMetricName, true)); + counterMap.put(readBytesMetricName, registerMetric(readBytesMetricName, false)); + + totalFiles += 1; + allFileNamesBuilder.append(file.getFileAbsolutePath()); + + if (it.hasNext()) { + allFileNamesBuilder.append(", "); + } + fileCache.add(file); + } + + allFileNamesBuilder.append(')'); + String totalFileMetricName = allFileNamesBuilder.toString(); + LongCounter totalFileCounter = registerMetric(totalFileMetricName, false); + counterMap.put(totalFileMetricName, registerMetric(totalFileMetricName, false)); + updateMetric(totalFileCounter, totalFiles); + + // tick start + String tickStartMetric = getMetricName("tickStart"); + LongCounter durationCounter = registerMetric(tickStartMetric, false); + counterMap.put(tickStartMetric, durationCounter); + tickStartMetric(durationCounter); + + iter = fileCache.iterator(); } - private Charset getCharacterSet() { - switch (ftpConfig.encoding) { - case "gbk": - return Charset.forName("GBK"); - case "utf-8": + private String getMetricName(String action) { + switch (action) { + case "tickStart": + case "tickEnd": + return FTP_COUNTER_PREFIX + "_read_duration"; + + case "readLines": + return FTP_COUNTER_PREFIX + "_read_lines"; + + case "readBytes": + return FTP_COUNTER_PREFIX + "_read_bytes"; + default: - return StandardCharsets.UTF_8; + throw new RuntimeException("illegal parameter"); + } + } + + public LongCounter registerMetric(String metricName, boolean meterView) { + LongCounter counter = runtimeContext.getLongCounter(metricName); + baseMetric.addMetric(metricName, counter, meterView); + return counter; + } + + public void updateMetric(LongCounter counter, long incr) { + counter.add(incr); + } + + public void setMetricValue(LongCounter counter, Long value) { + counter.resetLocal(); + counter.add(value); + } + + public void tickStartMetric(LongCounter counter) { + if (counter != null) { + counter.resetLocal(); + counter.add(System.currentTimeMillis()); + } + } + + public void tickEndMetric(LongCounter counter) { + if (counter != null) { + Long startTime = counter.getLocalValue(); + counter.resetLocal(); + counter.add(System.currentTimeMillis() - startTime); } } } diff --git a/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/source/FtpInputFormat.java b/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/source/FtpInputFormat.java index e4e3e9e94e..b888951a46 100644 --- a/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/source/FtpInputFormat.java +++ b/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/source/FtpInputFormat.java @@ -20,15 +20,17 @@ import com.dtstack.chunjun.conf.FieldConf; import com.dtstack.chunjun.connector.ftp.client.Data; -import com.dtstack.chunjun.connector.ftp.client.FileUtil; import com.dtstack.chunjun.connector.ftp.conf.ConfigConstants; import com.dtstack.chunjun.connector.ftp.conf.FtpConfig; import com.dtstack.chunjun.connector.ftp.converter.FtpColumnConverter; import com.dtstack.chunjun.connector.ftp.converter.FtpRowConverter; -import com.dtstack.chunjun.connector.ftp.format.IFormatConfig; +import com.dtstack.chunjun.connector.ftp.extend.ftp.IFormatConfig; +import com.dtstack.chunjun.connector.ftp.extend.ftp.concurrent.ConcurrentFileSplit; +import com.dtstack.chunjun.connector.ftp.extend.ftp.concurrent.FtpFileSplit; +import com.dtstack.chunjun.connector.ftp.handler.DTFtpHandler; import com.dtstack.chunjun.connector.ftp.handler.FtpHandlerFactory; -import com.dtstack.chunjun.connector.ftp.handler.IFtpHandler; import com.dtstack.chunjun.connector.ftp.handler.Position; +import com.dtstack.chunjun.connector.ftp.spliter.ConcurrentFileSplitFactory; import com.dtstack.chunjun.constants.ConstantValue; import com.dtstack.chunjun.restore.FormatState; import com.dtstack.chunjun.source.format.BaseRichInputFormat; @@ -41,12 +43,11 @@ import org.apache.flink.table.data.RowData; import org.apache.commons.collections.CollectionUtils; -import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang3.StringUtils; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; -import java.util.Comparator; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -65,7 +66,7 @@ public class FtpInputFormat extends BaseRichInputFormat { private transient FtpFileReader reader; - private transient IFtpHandler ftpHandler; + private transient DTFtpHandler ftpHandler; private transient Data data; @@ -86,7 +87,7 @@ public void openInputFormat() throws IOException { @Override public InputSplit[] createInputSplitsInternal(int minNumSplits) throws Exception { - IFtpHandler ftpHandler = FtpHandlerFactory.createFtpHandler(ftpConfig.getProtocol()); + DTFtpHandler ftpHandler = FtpHandlerFactory.createFtpHandler(ftpConfig.getProtocol()); ftpHandler.loginFtpServer(ftpConfig); List files = new ArrayList<>(); @@ -101,39 +102,19 @@ public InputSplit[] createInputSplitsInternal(int minNumSplits) throws Exception } } - List fileList = new ArrayList<>(); + ConcurrentFileSplit spliter = + ConcurrentFileSplitFactory.createConcurrentFileSplit(ftpConfig); + List fileList = + spliter.buildFtpFileSplit(ftpHandler, buildIFormatConfig(ftpConfig), files); - if (CollectionUtils.isNotEmpty(files)) { - for (String filePath : files) { - // add file with compressType - if (org.apache.commons.lang.StringUtils.isNotBlank( - ftpConfig.getCompressType())) { - FileUtil.addCompressFile(ftpHandler, filePath, ftpConfig, fileList); - - } else { - // add normal file - FileUtil.addFile(ftpHandler, filePath, ftpConfig, fileList); - } - } - } - if (CollectionUtils.isEmpty(fileList)) { - throw new RuntimeException("There are no readable files in directory " + path); - } - - FtpInputSplit[] ftpInputSplits = new FtpInputSplit[minNumSplits]; - for (int index = 0; index < minNumSplits; ++index) { + int numSplits = minNumSplits; + FtpInputSplit[] ftpInputSplits = new FtpInputSplit[numSplits]; + for (int index = 0; index < numSplits; ++index) { ftpInputSplits[index] = new FtpInputSplit(); } - // 先根据文件路径排序 - // 再根据文件里面开始的偏移量排序, 从小到大 - fileList.sort( - Comparator.comparing(FtpFileSplit::getFileAbsolutePath) - .thenComparing( - FtpFileSplit::getStartPosition, Comparator.naturalOrder())); - for (int i = 0; i < fileList.size(); ++i) { - ftpInputSplits[i % minNumSplits].getFileSplits().add(fileList.get(i)); + ftpInputSplits[i % numSplits].getFileSplits().add(fileList.get(i)); } return ftpInputSplits; } finally { @@ -142,7 +123,7 @@ public InputSplit[] createInputSplitsInternal(int minNumSplits) throws Exception } @Override - public void openInternal(InputSplit split) { + public void openInternal(InputSplit split) throws IOException { ftpHandler = FtpHandlerFactory.createFtpHandler(ftpConfig.getProtocol()); ftpHandler.loginFtpServer(ftpConfig); @@ -163,7 +144,8 @@ public void openInternal(InputSplit split) { reader.setFromLine(0); } - reader.setIFormatConfig(buildIFormatConfig(ftpConfig)); + reader.setiFormatConfig(buildIFormatConfig(ftpConfig)); + reader.enableMetric(getRuntimeContext(), inputMetric); reader.skipHasReadFiles(); } @@ -174,12 +156,15 @@ public boolean reachedEnd() throws IOException { } @Override - @SuppressWarnings("unchecked") protected RowData nextRecordInternal(RowData rowData) throws ReadRecordException { String[] fields = data.getData(); + if (data.getException() != null) { + throw new ReadRecordException( + data.getException().getMessage(), data.getException(), 0, rowData); + } try { - if (fields.length == 1 && org.apache.commons.lang.StringUtils.isBlank(fields[0])) { + if (fields.length == 1 && StringUtils.isBlank(fields[0])) { LOG.warn("read data:{}, it will not be written.", Arrays.toString(fields)); return null; } @@ -220,7 +205,7 @@ protected RowData nextRecordInternal(RowData rowData) throws ReadRecordException for (int i = 0; i < CollectionUtils.size(columns); i++) { FieldConf fieldConf = columns.get(i); - Object value; + Object value = null; if (fieldConf.getValue() != null) { value = fieldConf.getValue(); } else { @@ -243,7 +228,7 @@ protected RowData nextRecordInternal(RowData rowData) throws ReadRecordException rowData = rowConverter.toInternal(genericRowData); } } catch (Exception e) { - throw new ReadRecordException("Read data error.", e, 0, fields); + throw new ReadRecordException("Read data error.", e, 0, rowData); } position = data.getPosition(); return rowData; @@ -284,12 +269,19 @@ private IFormatConfig buildIFormatConfig(FtpConfig ftpConfig) { iFormatConfig.setFileConfig(ftpConfig.getFileConfig()); final String[] fields = new String[ftpConfig.getColumn().size()]; IntStream.range(0, fields.length) - .forEach(i -> fields[i] = ftpConfig.getColumn().get(i).getName()); + .forEach( + i -> { + fields[i] = ftpConfig.getColumn().get(i).getName(); + }); iFormatConfig.setFields(fields); + iFormatConfig.setFetchMaxSize(ftpConfig.getMaxFetchSize()); + iFormatConfig.setParallelism(ftpConfig.getParallelism()); + iFormatConfig.setColumnDelimiter(ftpConfig.getColumnDelimiter()); + return iFormatConfig; } - private List listFilesInPath(IFtpHandler ftpHandler, String path) throws IOException { + private List listFilesInPath(DTFtpHandler ftpHandler, String path) { path = path.trim(); String fileRegex = path.substring(path.lastIndexOf("/") + 1); boolean isRegex = StringUtils.containsAny(fileRegex, REGEX_CHARS); @@ -343,12 +335,13 @@ public void closeInputFormat() { Map allCounters = inputMetric.getMetricCounters(); Map ftpCounter = new HashMap<>(); - allCounters.forEach( - (key, value) -> { - if (key.startsWith(FTP_COUNTER_PREFIX)) { - ftpCounter.put(key, value.getLocalValue()); - } - }); + allCounters.entrySet().stream() + .forEach( + kv -> { + if (kv.getKey().startsWith(FTP_COUNTER_PREFIX)) { + ftpCounter.put(kv.getKey(), kv.getValue().getLocalValue()); + } + }); PrintUtil.printResult(ftpCounter); diff --git a/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/source/FtpInputFormatBuilder.java b/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/source/FtpInputFormatBuilder.java index de73b8dc40..5c299cf3e3 100644 --- a/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/source/FtpInputFormatBuilder.java +++ b/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/source/FtpInputFormatBuilder.java @@ -24,7 +24,6 @@ import org.apache.commons.lang3.StringUtils; -/** @author jiangbo */ public class FtpInputFormatBuilder extends BaseRichInputFormatBuilder { public FtpInputFormatBuilder() { diff --git a/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/source/FtpInputSplit.java b/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/source/FtpInputSplit.java index 280782b3d0..f047c7bcce 100644 --- a/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/source/FtpInputSplit.java +++ b/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/source/FtpInputSplit.java @@ -18,6 +18,8 @@ package com.dtstack.chunjun.connector.ftp.source; +import com.dtstack.chunjun.connector.ftp.extend.ftp.concurrent.FtpFileSplit; + import org.apache.flink.core.io.InputSplit; import java.util.ArrayList; diff --git a/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/source/FtpInputStreamProxy.java b/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/source/FtpInputStreamProxy.java new file mode 100644 index 0000000000..fd4a2918eb --- /dev/null +++ b/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/source/FtpInputStreamProxy.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.chunjun.connector.ftp.source; + +import java.io.IOException; +import java.io.InputStream; + +public class FtpInputStreamProxy extends InputStream { + + private InputStream input; + private long readLimit; + private long currentReadBytes; + + public FtpInputStreamProxy(InputStream input, long readLimit) { + this.input = input; + this.readLimit = readLimit; + this.currentReadBytes = 0; + } + + @Override + public int read() throws IOException { + if (currentReadBytes + 1 > readLimit) { + return -1; + } else { + int ch = input.read(); + currentReadBytes += 1; + return ch; + } + } + + @Override + public int read(byte[] b) throws IOException { + return read(b, 0, b.length); + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + if (currentReadBytes + len <= readLimit) { + int ch = input.read(b, off, len); + currentReadBytes += ch; + return ch; + } else { + int maxRead = (int) (readLimit - currentReadBytes); + if (maxRead > 0) { + int ch = input.read(b, off, maxRead); + currentReadBytes += ch; + return ch; + } else { + return -1; + } + } + } + + @Override + public long skip(long n) throws IOException { + return input.skip(n); + } + + @Override + public synchronized void reset() throws IOException { + input.reset(); + } + + @Override + public boolean markSupported() { + return input.markSupported(); + } + + @Override + public int available() throws IOException { + if (currentReadBytes > readLimit) { + return -1; + } + + return input.available(); + } + + @Override + public void close() throws IOException { + input.close(); + } + + @Override + public synchronized void mark(int readlimit) { + input.mark(readlimit); + } + + public long getCurrentReadBytes() { + return currentReadBytes; + } +} diff --git a/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/source/FtpSourceFactory.java b/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/source/FtpSourceFactory.java index 11b8b985c2..d90e1b9efb 100644 --- a/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/source/FtpSourceFactory.java +++ b/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/source/FtpSourceFactory.java @@ -18,7 +18,6 @@ package com.dtstack.chunjun.connector.ftp.source; -import com.dtstack.chunjun.conf.FieldConf; import com.dtstack.chunjun.conf.SyncConf; import com.dtstack.chunjun.connector.ftp.conf.ConfigConstants; import com.dtstack.chunjun.connector.ftp.conf.FtpConfig; @@ -35,17 +34,9 @@ import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.logical.RowType; -import java.util.List; -import java.util.stream.Collectors; - -/** - * @program chunjun - * @author: xiuzhu - * @create: 2021/06/19 - */ public class FtpSourceFactory extends SourceFactory { - private final FtpConfig ftpConfig; + private FtpConfig ftpConfig; public FtpSourceFactory(SyncConf syncConf, StreamExecutionEnvironment env) { super(syncConf, env); @@ -61,6 +52,8 @@ public FtpSourceFactory(SyncConf syncConf, StreamExecutionEnvironment env) { String fieldDelimiter = StringUtil.convertRegularExpr(ftpConfig.getFieldDelimiter()); ftpConfig.setFieldDelimiter(fieldDelimiter); } + + ftpConfig.setColumn(syncConf.getReader().getFieldList()); super.initCommonConf(ftpConfig); } @@ -68,16 +61,6 @@ public FtpSourceFactory(SyncConf syncConf, StreamExecutionEnvironment env) { public DataStream createSource() { FtpInputFormatBuilder builder = new FtpInputFormatBuilder(); builder.setFtpConfig(ftpConfig); - List fieldConfList = - ftpConfig.getColumn().stream() - .peek( - fieldConf -> { - if (fieldConf.getName() == null) { - fieldConf.setName(String.valueOf(fieldConf.getIndex())); - } - }) - .collect(Collectors.toList()); - ftpConfig.setColumn(fieldConfList); final RowType rowType = TableUtil.createRowType(ftpConfig.getColumn(), getRawTypeConverter()); builder.setRowConverter(new FtpColumnConverter(rowType, ftpConfig), useAbstractBaseColumn); diff --git a/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/spliter/ConcurrentCsvSplit.java b/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/spliter/ConcurrentCsvSplit.java new file mode 100644 index 0000000000..105ad39e94 --- /dev/null +++ b/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/spliter/ConcurrentCsvSplit.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.chunjun.connector.ftp.spliter; + +import com.dtstack.chunjun.connector.ftp.client.FileUtil; +import com.dtstack.chunjun.connector.ftp.extend.ftp.IFormatConfig; +import com.dtstack.chunjun.connector.ftp.extend.ftp.IFtpHandler; +import com.dtstack.chunjun.connector.ftp.extend.ftp.concurrent.FtpFileSplit; +import com.dtstack.chunjun.throwable.ChunJunRuntimeException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; + +public class ConcurrentCsvSplit extends DefaultFileSplit { + private static final Logger LOG = LoggerFactory.getLogger(ConcurrentCsvSplit.class); + + @Override + public List buildFtpFileSplit( + IFtpHandler handler, IFormatConfig config, List files) { + List fileSplits = new ArrayList<>(); + + for (String filePath : files) { + fileSplits.addAll(analyseSingleFile(handler, config, filePath)); + } + return fileSplits; + } + + public List analyseSingleFile( + IFtpHandler ftpHandler, IFormatConfig config, String filePath) { + List ftpFileSplits = new ArrayList<>(); + + String columnDelimiter = config.getColumnDelimiter(); + try { + long currentFileSize = ftpHandler.getFileSize(filePath); + long fetchSize = Math.max(1024 * 1024, config.getFetchMaxSize()); + String filename = FileUtil.getFilename(filePath); + if (currentFileSize > fetchSize) { + int perSplit = + Math.min( + (int) currentFileSize / config.getParallelism(), Integer.MAX_VALUE); + long startPosition = 0; + long endPosition = startPosition + perSplit; + + while (endPosition <= currentFileSize) { + if (endPosition == currentFileSize) { + FtpFileSplit ftpFileSplit = + new FtpFileSplit(startPosition, endPosition, filePath, filename); + ftpFileSplits.add(ftpFileSplit); + break; + } + + InputStream input = ftpHandler.getInputStreamByPosition(filePath, endPosition); + boolean notMatch = true; + + while (notMatch) { + notMatch = false; + for (char c : columnDelimiter.toCharArray()) { + char ch = (char) input.read(); + endPosition += 1; + + if (ch != c) { + notMatch = true; + break; + } + } + } + + FtpFileSplit ftpFileSplit = + new FtpFileSplit(startPosition, endPosition, filePath, filename); + ftpFileSplits.add(ftpFileSplit); + + LOG.info( + String.format( + "build file split, filename: %s, startPosition: %d, endPosition: %d", + filePath, startPosition, endPosition)); + + startPosition = endPosition; + endPosition = startPosition + perSplit; + } + + if (endPosition != currentFileSize) { + FtpFileSplit ftpFileSplit = + new FtpFileSplit(startPosition, currentFileSize, filePath, filename); + ftpFileSplits.add(ftpFileSplit); + + LOG.info( + String.format( + "build file split, filename: %s, startPosition: %d, endPosition: %d", + filePath, startPosition, currentFileSize)); + } + } else { + ftpFileSplits.add(new FtpFileSplit(0, currentFileSize, filePath, filename)); + } + + ftpFileSplits.sort(compare()); + + return ftpFileSplits; + } catch (Exception e) { + throw new ChunJunRuntimeException(e); + } + } +} diff --git a/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/spliter/ConcurrentFileSplitFactory.java b/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/spliter/ConcurrentFileSplitFactory.java new file mode 100644 index 0000000000..ee5bb16f76 --- /dev/null +++ b/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/spliter/ConcurrentFileSplitFactory.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.chunjun.connector.ftp.spliter; + +import com.dtstack.chunjun.connector.ftp.conf.FtpConfig; +import com.dtstack.chunjun.connector.ftp.extend.ftp.concurrent.ConcurrentFileSplit; +import com.dtstack.chunjun.throwable.ChunJunRuntimeException; + +import org.apache.commons.lang3.StringUtils; + +import java.lang.reflect.Constructor; +import java.util.Locale; + +public class ConcurrentFileSplitFactory { + + public static ConcurrentFileSplit createConcurrentFileSplit(FtpConfig config) { + /** user defined */ + String customSplitClassName = config.getCustomConcurrentFileSplitClassName(); + if (StringUtils.isNotBlank(customSplitClassName)) { + try { + Class clazz = Class.forName(customSplitClassName); + Constructor constructor = clazz.getConstructor(); + return (ConcurrentFileSplit) constructor.newInstance(); + } catch (Exception e) { + throw new ChunJunRuntimeException(e); + } + } + + /** compress file */ + String compressType = config.getCompressType(); + if (StringUtils.isNotBlank(compressType)) { + if (compressType.toUpperCase(Locale.ENGLISH).equals("ZIP")) { + return new ConcurrentZipCompressSplit(); + } else { + throw new ChunJunRuntimeException("not support compress type"); + } + } + + /** normal file, csv, excel, txt */ + String fileType = config.getFileType(); + switch (fileType.toUpperCase(Locale.ENGLISH)) { + case "CSV": + case "TXT": + return new ConcurrentCsvSplit(); + + case "EXCEL": + default: + return new DefaultFileSplit(); + } + } +} diff --git a/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/spliter/ConcurrentZipCompressSplit.java b/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/spliter/ConcurrentZipCompressSplit.java new file mode 100644 index 0000000000..3a9c5b67cc --- /dev/null +++ b/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/spliter/ConcurrentZipCompressSplit.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.chunjun.connector.ftp.spliter; + +import com.dtstack.chunjun.connector.ftp.extend.ftp.IFormatConfig; +import com.dtstack.chunjun.connector.ftp.extend.ftp.IFtpHandler; +import com.dtstack.chunjun.connector.ftp.extend.ftp.concurrent.FtpFileSplit; +import com.dtstack.chunjun.throwable.ChunJunRuntimeException; + +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.List; +import java.util.zip.ZipEntry; +import java.util.zip.ZipInputStream; + +public class ConcurrentZipCompressSplit extends DefaultFileSplit { + @Override + public List buildFtpFileSplit( + IFtpHandler handler, IFormatConfig config, List files) { + List fileSplits = new ArrayList<>(); + for (String filePath : files) { + fileSplits.addAll(analyseSingleFile(handler, filePath, config.getEncoding(), "ZIP")); + } + + fileSplits.sort(compare()); + return fileSplits; + } + + private List analyseSingleFile( + IFtpHandler ftpHandler, String filePath, String encoding, String compressType) { + + List fileSplits = new ArrayList<>(); + try (java.util.zip.ZipInputStream zipInputStream = + new ZipInputStream( + ftpHandler.getInputStream(filePath), Charset.forName(encoding))) { + ZipEntry zipEntry; + + while ((zipEntry = zipInputStream.getNextEntry()) != null) { + fileSplits.add( + new FtpFileSplit( + 0, zipEntry.getSize(), filePath, zipEntry.getName(), compressType)); + } + return fileSplits; + } catch (Exception e) { + throw new ChunJunRuntimeException(e); + } + } +} diff --git a/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/spliter/DefaultFileSplit.java b/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/spliter/DefaultFileSplit.java new file mode 100644 index 0000000000..988f3c0d34 --- /dev/null +++ b/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/spliter/DefaultFileSplit.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.chunjun.connector.ftp.spliter; + +import com.dtstack.chunjun.connector.ftp.client.FileUtil; +import com.dtstack.chunjun.connector.ftp.extend.ftp.IFormatConfig; +import com.dtstack.chunjun.connector.ftp.extend.ftp.IFtpHandler; +import com.dtstack.chunjun.connector.ftp.extend.ftp.concurrent.ConcurrentFileSplit; +import com.dtstack.chunjun.connector.ftp.extend.ftp.concurrent.FtpFileSplit; +import com.dtstack.chunjun.throwable.ChunJunRuntimeException; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; + +public class DefaultFileSplit implements ConcurrentFileSplit { + @Override + public List buildFtpFileSplit( + IFtpHandler handler, IFormatConfig config, List files) { + + List ftpFileSplits = new ArrayList<>(); + + for (String filePath : files) { + try { + String fileName = FileUtil.getFilename(filePath); + long currentFileSize = handler.getFileSize(filePath); + FtpFileSplit fileSplit = new FtpFileSplit(0, currentFileSize, filePath, fileName); + ftpFileSplits.add(fileSplit); + } catch (Exception e) { + throw new ChunJunRuntimeException(e); + } + } + + ftpFileSplits.sort(compare()); + return ftpFileSplits; + } + + /** + * 先根据文件路径排序 再根据文件里面开始的偏移量排序, 从小到大 + * + * @return + */ + @Override + public Comparator compare() { + return Comparator.comparing(FtpFileSplit::getFileAbsolutePath) + .thenComparing( + FtpFileSplit::getStartPosition, + (a, b) -> { + return a.compareTo(b); + }); + } +} diff --git a/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/table/FtpDynamicTableFactory.java b/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/table/FtpDynamicTableFactory.java index 5f42828d42..32bc4c5a71 100644 --- a/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/table/FtpDynamicTableFactory.java +++ b/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/table/FtpDynamicTableFactory.java @@ -47,11 +47,6 @@ import java.util.List; import java.util.Set; -/** - * @program chunjun - * @author: xiuzhu - * @create: 2021/06/19 - */ public class FtpDynamicTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory { private static final String IDENTIFIER = "ftp-x"; @@ -65,6 +60,7 @@ private static FtpConfig getFtpConfByOptions(ReadableConfig config) { ftpConfig.setPassword(config.get(FtpOptions.PASSWORD)); ftpConfig.setEncoding(config.get(FtpOptions.ENCODING)); + ftpConfig.setFileType(config.get(FtpOptions.FILE_TYPE)); if (config.get(FtpOptions.TIMEOUT) != null) { ftpConfig.setTimeout(config.get(FtpOptions.TIMEOUT)); @@ -80,6 +76,9 @@ private static FtpConfig getFtpConfByOptions(ReadableConfig config) { ftpConfig.setPort(config.get(FtpOptions.PORT)); } + if (config.get(FtpOptions.isFirstLineHeader) != null) { + ftpConfig.setIsFirstLineHeader(config.get(FtpOptions.isFirstLineHeader)); + } return ftpConfig; } @@ -162,6 +161,8 @@ public Set> optionalOptions() { options.add(FtpOptions.ENCODING); options.add(FtpOptions.MAX_FILE_SIZE); options.add(FtpOptions.FORMAT); + options.add(FtpOptions.isFirstLineHeader); + options.add(FtpOptions.FILE_TYPE); return options; } } diff --git a/chunjun-connectors/chunjun-connector-ftp/src/test/java/com/dtstack/chunjun/connector/ftp/format/TextFileFormatTest.java b/chunjun-connectors/chunjun-connector-ftp/src/test/java/com/dtstack/chunjun/connector/ftp/format/TextFileFormatTest.java deleted file mode 100644 index e3b6e7d47c..0000000000 --- a/chunjun-connectors/chunjun-connector-ftp/src/test/java/com/dtstack/chunjun/connector/ftp/format/TextFileFormatTest.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.dtstack.chunjun.connector.ftp.format; - -import com.dtstack.chunjun.connector.ftp.client.File; - -import org.apache.commons.io.input.BrokenInputStream; -import org.junit.Before; -import org.junit.Test; - -import java.io.IOException; -import java.io.InputStream; -import java.util.HashMap; - -import static org.junit.Assert.assertThrows; - -public class TextFileFormatTest { - - private TextFileFormat textFileFormatUnderTest; - - @Before - public void setUp() { - textFileFormatUnderTest = new TextFileFormat(); - } - - @Test - public void testOpen_BrokenInputStream() { - // Setup - final File file = - new File("fileCompressPath", "fileAbsolutePath", "fileName", "compressType"); - final InputStream inputStream = new BrokenInputStream(); - final IFormatConfig config = new IFormatConfig(); - config.setFirstLineHeader(false); - config.setFileConfig(new HashMap<>()); - config.setFieldDelimiter("fieldDelimiter"); - config.setEncoding("encoding"); - config.setFields(new String[] {"fields"}); - - // Run the test - assertThrows( - IOException.class, () -> textFileFormatUnderTest.open(file, inputStream, config)); - } - - @Test - public void testClose() throws Exception { - // Setup - // Run the test - textFileFormatUnderTest.close(); - - // Verify the results - } -} diff --git a/chunjun-e2e/src/test/java/com/dtstack/chunjun/connector/test/utils/ChunjunFlinkStandaloneTestEnvironment.java b/chunjun-e2e/src/test/java/com/dtstack/chunjun/connector/test/utils/ChunjunFlinkStandaloneTestEnvironment.java index 548165b01e..0238e4bdeb 100644 --- a/chunjun-e2e/src/test/java/com/dtstack/chunjun/connector/test/utils/ChunjunFlinkStandaloneTestEnvironment.java +++ b/chunjun-e2e/src/test/java/com/dtstack/chunjun/connector/test/utils/ChunjunFlinkStandaloneTestEnvironment.java @@ -34,7 +34,7 @@ import org.apache.flink.table.api.ValidationException; import com.google.common.collect.Lists; -import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang3.StringUtils; import org.junit.After; import org.junit.Assert; import org.junit.Before;