From 5fbb2bdfb27f02b12b500526fc6594163d7d6cc6 Mon Sep 17 00:00:00 2001 From: limengyao0809 Date: Tue, 6 Sep 2022 11:16:05 +0800 Subject: [PATCH 1/3] [hotfix-#1210][ftp] Fix the issue that the connections of SFTP are not released when task failed. --- .../connector/ftp/handler/FtpHandler.java | 5 ++ .../connector/ftp/handler/IFtpHandler.java | 2 +- .../connector/ftp/handler/SftpHandler.java | 5 ++ .../connector/ftp/source/FtpInputFormat.java | 78 +++++++++---------- 4 files changed, 49 insertions(+), 41 deletions(-) diff --git a/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/handler/FtpHandler.java b/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/handler/FtpHandler.java index c66574d621..192ef9e378 100644 --- a/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/handler/FtpHandler.java +++ b/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/handler/FtpHandler.java @@ -461,4 +461,9 @@ private boolean isExist(String path) { private String encodePath(String path) throws UnsupportedEncodingException { return new String(path.getBytes(controlEncoding), FTP.DEFAULT_CONTROL_ENCODING); } + + @Override + public void close() throws Exception { + logoutFtpServer(); + } } diff --git a/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/handler/IFtpHandler.java b/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/handler/IFtpHandler.java index 1d0e0f66bb..6401d8134a 100644 --- a/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/handler/IFtpHandler.java +++ b/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/handler/IFtpHandler.java @@ -32,7 +32,7 @@ * * @author huyifan.zju@163.com */ -public interface IFtpHandler { +public interface IFtpHandler extends AutoCloseable { /** * 登录服务器 diff --git a/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/handler/SftpHandler.java b/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/handler/SftpHandler.java index 653534eb06..1c399c4da5 100644 --- a/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/handler/SftpHandler.java +++ b/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/handler/SftpHandler.java @@ -433,4 +433,9 @@ public long getFileSize(String path) throws IOException { throw new IOException(e); } } + + @Override + public void close() throws Exception { + logoutFtpServer(); + } } diff --git a/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/source/FtpInputFormat.java b/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/source/FtpInputFormat.java index 07ea4c3520..eadae9cdea 100644 --- a/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/source/FtpInputFormat.java +++ b/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/source/FtpInputFormat.java @@ -45,7 +45,6 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; import java.util.Comparator; import java.util.Iterator; import java.util.List; @@ -79,54 +78,53 @@ public void openInputFormat() throws IOException { @Override public InputSplit[] createInputSplitsInternal(int minNumSplits) throws Exception { - IFtpHandler ftpHandler = FtpHandlerFactory.createFtpHandler(ftpConfig.getProtocol()); - ftpHandler.loginFtpServer(ftpConfig); + try (IFtpHandler ftpHandler = FtpHandlerFactory.createFtpHandler(ftpConfig.getProtocol())) { + ftpHandler.loginFtpServer(ftpConfig); - List files = new ArrayList<>(); + List files = new ArrayList<>(); - String path = ftpConfig.getPath(); - if (path != null && path.length() > 0) { - path = path.replace("\n", "").replace("\r", ""); - String[] pathArray = path.split(","); - for (String p : pathArray) { - files.addAll(ftpHandler.getFiles(p.trim())); + String path = ftpConfig.getPath(); + if (path != null && path.length() > 0) { + path = path.replace("\n", "").replace("\r", ""); + String[] pathArray = path.split(","); + for (String p : pathArray) { + files.addAll(ftpHandler.getFiles(p.trim())); + } } - } - - List fileList = new ArrayList<>(); - if (CollectionUtils.isNotEmpty(files)) { - for (String filePath : files) { - if (StringUtils.isNotBlank(ftpConfig.getCompressType())) { - FileUtil.addFile(ftpHandler, filePath, ftpConfig, fileList); - } else { - fileList.add( - new File( - null, - filePath, - filePath.substring(filePath.lastIndexOf("/") + 1), - null)); + List fileList = new ArrayList<>(); + + if (CollectionUtils.isNotEmpty(files)) { + for (String filePath : files) { + if (StringUtils.isNotBlank(ftpConfig.getCompressType())) { + FileUtil.addFile(ftpHandler, filePath, ftpConfig, fileList); + } else { + fileList.add( + new File( + null, + filePath, + filePath.substring(filePath.lastIndexOf("/") + 1), + null)); + } } } - } - if (CollectionUtils.isEmpty(fileList)) { - throw new RuntimeException("There are no readable files in directory " + path); - } - LOG.info("FTP files = {}", GsonUtil.GSON.toJson(fileList)); - int numSplits = (Math.min(files.size(), minNumSplits)); - FtpInputSplit[] ftpInputSplits = new FtpInputSplit[numSplits]; - for (int index = 0; index < numSplits; ++index) { - ftpInputSplits[index] = new FtpInputSplit(); - } + if (CollectionUtils.isEmpty(fileList)) { + throw new RuntimeException("There are no readable files in directory " + path); + } + LOG.info("FTP files = {}", GsonUtil.GSON.toJson(fileList)); + int numSplits = (Math.min(files.size(), minNumSplits)); + FtpInputSplit[] ftpInputSplits = new FtpInputSplit[numSplits]; + for (int index = 0; index < numSplits; ++index) { + ftpInputSplits[index] = new FtpInputSplit(); + } - Collections.sort(fileList, Comparator.comparing(File::getFileAbsolutePath)); + fileList.sort(Comparator.comparing(File::getFileAbsolutePath)); - for (int i = 0; i < fileList.size(); ++i) { - ftpInputSplits[i % numSplits].getPaths().add(fileList.get(i)); + for (int i = 0; i < fileList.size(); ++i) { + ftpInputSplits[i % numSplits].getPaths().add(fileList.get(i)); + } + return ftpInputSplits; } - - ftpHandler.logoutFtpServer(); - return ftpInputSplits; } @Override From 372f57afa67eab62c5dce68d6f78711148912e48 Mon Sep 17 00:00:00 2001 From: limengyao0809 Date: Tue, 6 Sep 2022 12:02:40 +0800 Subject: [PATCH 2/3] [opt-code][ftp] Optimize code of 'Ftp'. --- .../connector/ftp/handler/FtpHandler.java | 59 +++++++++---------- .../connector/ftp/handler/SftpHandler.java | 49 +++++++-------- .../connector/ftp/sink/FtpOutputFormat.java | 18 ++---- 3 files changed, 55 insertions(+), 71 deletions(-) diff --git a/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/handler/FtpHandler.java b/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/handler/FtpHandler.java index 192ef9e378..4ac822b752 100644 --- a/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/handler/FtpHandler.java +++ b/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/handler/FtpHandler.java @@ -51,7 +51,6 @@ public class FtpHandler implements IFtpHandler { private static final Logger LOG = LoggerFactory.getLogger(FtpHandler.class); - private static final String DISCONNECT_FAIL_MESSAGE = "Failed to disconnect from ftp server"; private static final String SP = "/"; private FTPClient ftpClient = null; private String controlEncoding; @@ -175,18 +174,7 @@ public List getFiles(String path) { path = path + SP; } try { - FTPFile[] ftpFiles = ftpClient.listFiles(encodePath(path)); - if (ftpFiles != null) { - for (FTPFile ftpFile : ftpFiles) { - // .和..是特殊文件 - if (StringUtils.endsWith(ftpFile.getName(), ConstantValue.POINT_SYMBOL) - || StringUtils.endsWith( - ftpFile.getName(), ConstantValue.TWO_POINT_SYMBOL)) { - continue; - } - sources.addAll(getFiles(path + ftpFile.getName(), ftpFile)); - } - } + listFiles(path, sources); } catch (IOException e) { LOG.error("", e); throw new RuntimeException(e); @@ -194,13 +182,30 @@ public List getFiles(String path) { return sources; } + private void listFiles(String path, List sources) throws IOException { + FTPFile[] ftpFiles = ftpClient.listFiles(encodePath(path)); + if (ftpFiles != null) { + for (FTPFile ftpFile : ftpFiles) { + // .和..是特殊文件 + if (StringUtils.endsWith(ftpFile.getName(), ConstantValue.POINT_SYMBOL) + || StringUtils.endsWith( + ftpFile.getName(), ConstantValue.TWO_POINT_SYMBOL)) { + continue; + } + sources.addAll(getFiles(path + ftpFile.getName(), ftpFile)); + } + } + } + /** * 递归获取指定路径下的所有文件(暂无过滤) * - * @param path - * @param file - * @return - * @throws IOException + * @param path path + * @param file file + * + * @return file list + * + * @throws IOException io exception. */ private List getFiles(String path, FTPFile file) throws IOException { List sources = new ArrayList<>(); @@ -209,17 +214,7 @@ private List getFiles(String path, FTPFile file) throws IOException { path = path + SP; } ftpClient.enterLocalPassiveMode(); - FTPFile[] ftpFiles = ftpClient.listFiles(encodePath(path)); - if (ftpFiles != null) { - for (FTPFile ftpFile : ftpFiles) { - if (StringUtils.endsWith(ftpFile.getName(), ConstantValue.POINT_SYMBOL) - || StringUtils.endsWith( - ftpFile.getName(), ConstantValue.TWO_POINT_SYMBOL)) { - continue; - } - sources.addAll(getFiles(path + ftpFile.getName(), ftpFile)); - } - } + listFiles(path, sources); } else { sources.add(path); } @@ -324,7 +319,7 @@ public void deleteAllFilesInDir(String dir, List exclude) { } if (StringUtils.endsWith(ftpFile.getName(), ConstantValue.POINT_SYMBOL) || StringUtils.endsWith( - ftpFile.getName(), ConstantValue.TWO_POINT_SYMBOL)) { + ftpFile.getName(), ConstantValue.TWO_POINT_SYMBOL)) { continue; } deleteAllFilesInDir(dir + ftpFile.getName(), exclude); @@ -364,8 +359,7 @@ public boolean deleteFile(String filePath) throws IOException { public InputStream getInputStream(String filePath) { try { ftpClient.enterLocalPassiveMode(); - InputStream is = ftpClient.retrieveFileStream(encodePath(filePath)); - return is; + return ftpClient.retrieveFileStream(encodePath(filePath)); } catch (IOException e) { String message = String.format("读取文件 : [%s] 时出错,请确认文件:[%s]存在且配置的用户有权限读取", filePath, filePath); @@ -388,7 +382,7 @@ public List listDirs(String path) { for (FTPFile ftpFile : ftpFiles) { if (StringUtils.endsWith(ftpFile.getName(), ConstantValue.POINT_SYMBOL) || StringUtils.endsWith( - ftpFile.getName(), ConstantValue.TWO_POINT_SYMBOL)) { + ftpFile.getName(), ConstantValue.TWO_POINT_SYMBOL)) { continue; } sources.add(path + ftpFile.getName()); @@ -434,6 +428,7 @@ public long getFileSize(String path) throws IOException { * 判断路径是否存在 * * @param path 判断的路径 + * * @return true 存在 false 不存在 */ private boolean isExist(String path) { diff --git a/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/handler/SftpHandler.java b/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/handler/SftpHandler.java index 1c399c4da5..9b90215b4c 100644 --- a/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/handler/SftpHandler.java +++ b/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/handler/SftpHandler.java @@ -55,8 +55,8 @@ public class SftpHandler implements IFtpHandler { private static final String DOT_DOT = ".."; private static final String SP = "/"; private static final String SRC_MAIN = "src/main"; - private static String PATH_NOT_EXIST_ERR = "no such file"; - private static String MSG_AUTH_FAIL = "Auth fail"; + private static final String PATH_NOT_EXIST_ERR = "no such file"; + private static final String MSG_AUTH_FAIL = "Auth fail"; private Session session = null; private ChannelSftp channelSftp = null; @@ -162,7 +162,7 @@ public boolean isDirExist(String directoryPath) { SftpATTRS sftpAttrs = channelSftp.lstat(directoryPath); return sftpAttrs.isDir(); } catch (SftpException e) { - if (e.getMessage().toLowerCase().equals(PATH_NOT_EXIST_ERR)) { + if (e.getMessage().equalsIgnoreCase(PATH_NOT_EXIST_ERR)) { LOG.warn("{}", e.getMessage()); return false; } @@ -174,14 +174,13 @@ public boolean isDirExist(String directoryPath) { @Override public boolean isFileExist(String filePath) { - boolean isExitFlag = false; try { SftpATTRS sftpAttrs = channelSftp.lstat(filePath); if (sftpAttrs.getSize() >= 0) { - isExitFlag = true; + return true; } } catch (SftpException e) { - if (e.getMessage().toLowerCase().equals(PATH_NOT_EXIST_ERR)) { + if (e.getMessage().equalsIgnoreCase(PATH_NOT_EXIST_ERR)) { LOG.warn("{}", e.getMessage()); return false; } else { @@ -190,7 +189,7 @@ public boolean isFileExist(String filePath) { throw new RuntimeException(message, e); } } - return isExitFlag; + return false; } @Override @@ -223,9 +222,9 @@ public List listDirs(String path) { } try { - Vector vector = channelSftp.ls(path); - for (int i = 0; i < vector.size(); ++i) { - ChannelSftp.LsEntry le = (ChannelSftp.LsEntry) vector.get(i); + Vector vector = channelSftp.ls(path); + for (Object o : vector) { + ChannelSftp.LsEntry le = (ChannelSftp.LsEntry) o; String strName = le.getFilename(); if (!strName.equals(DOT) && !strName.equals(SRC_MAIN) @@ -256,9 +255,9 @@ public List getFiles(String path) { path = path + SP; } try { - Vector vector = channelSftp.ls(path); - for (int i = 0; i < vector.size(); ++i) { - ChannelSftp.LsEntry le = (ChannelSftp.LsEntry) vector.get(i); + Vector vector = channelSftp.ls(path); + for (Object o : vector) { + ChannelSftp.LsEntry le = (ChannelSftp.LsEntry) o; String strName = le.getFilename(); if (!strName.equals(DOT) && !strName.equals(SRC_MAIN) @@ -283,14 +282,13 @@ public List getFiles(String path) { public void mkDirRecursive(String directoryPath) { boolean isDirExist = false; try { - this.printWorkingDirectory(); + printWorkingDirectory(); SftpATTRS sftpAttrs = this.channelSftp.lstat(directoryPath); isDirExist = sftpAttrs.isDir(); } catch (SftpException e) { - if (e.getMessage().toLowerCase().equals(PATH_NOT_EXIST_ERR)) { + if (e.getMessage().equalsIgnoreCase(PATH_NOT_EXIST_ERR)) { LOG.warn("{}", e.getMessage()); LOG.warn("Path [{}] does not exist and will be created.", directoryPath); - isDirExist = false; } } if (!isDirExist) { @@ -352,9 +350,9 @@ public void deleteAllFilesInDir(String dir, List exclude) { } try { - Vector vector = channelSftp.ls(dir); - for (int i = 0; i < vector.size(); ++i) { - ChannelSftp.LsEntry le = (ChannelSftp.LsEntry) vector.get(i); + Vector vector = channelSftp.ls(dir); + for (Object o : vector) { + ChannelSftp.LsEntry le = (ChannelSftp.LsEntry) o; String strName = le.getFilename(); if (CollectionUtils.isNotEmpty(exclude) && exclude.contains(strName)) { continue; @@ -397,23 +395,20 @@ public boolean deleteFile(String filePath) throws IOException { return true; } - public boolean mkDirSingleHierarchy(String directoryPath) throws SftpException { - boolean isDirExist = false; + public void mkDirSingleHierarchy(String directoryPath) throws SftpException { + boolean isDirExist; try { SftpATTRS sftpAttrs = this.channelSftp.lstat(directoryPath); isDirExist = sftpAttrs.isDir(); } catch (SftpException e) { - if (!isDirExist) { - LOG.info("Creating a directory step by step [{}]", directoryPath); - this.channelSftp.mkdir(directoryPath); - return true; - } + LOG.info("Creating a directory step by step [{}]", directoryPath); + this.channelSftp.mkdir(directoryPath); + return; } if (!isDirExist) { LOG.info("Creating a directory step by step [{}]", directoryPath); this.channelSftp.mkdir(directoryPath); } - return true; } @Override diff --git a/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/sink/FtpOutputFormat.java b/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/sink/FtpOutputFormat.java index 4d3ad15521..a82fe3b639 100644 --- a/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/sink/FtpOutputFormat.java +++ b/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/sink/FtpOutputFormat.java @@ -27,10 +27,10 @@ import com.dtstack.chunjun.throwable.WriteRecordException; import com.dtstack.chunjun.util.ExceptionUtil; -import org.apache.flink.table.data.RowData; - import org.apache.commons.lang.StringUtils; +import org.apache.flink.table.data.RowData; + import java.io.BufferedWriter; import java.io.File; import java.io.IOException; @@ -52,9 +52,6 @@ public class FtpOutputFormat extends BaseFileOutputFormat { private static final int NEWLINE = 10; protected FtpConfig ftpConfig; - protected List columnTypes; - - protected List columnNames; private transient IFtpHandler ftpHandler; @@ -109,6 +106,7 @@ protected void nextBlock() { } @Override + @SuppressWarnings("unchecked") public void writeSingleRecordToFile(RowData rowData) throws WriteRecordException { try { if (writer == null) { @@ -241,7 +239,7 @@ protected long getCurrentFileSize() { @Override protected void writeMultipleRecordsInternal() { - notSupportBatchWrite("FtpWriter"); + throw new UnsupportedOperationException("FtpWriter 不支持批量写入"); } public FtpConfig getFtpConfig() { @@ -252,15 +250,11 @@ public void setFtpConfig(FtpConfig ftpConfig) { this.ftpConfig = ftpConfig; } - protected void notSupportBatchWrite(String writerName) { - throw new UnsupportedOperationException(writerName + "不支持批量写入"); - } - private String handleUserSpecificFileName(String tmpDataFileName, int fileNumber) { String fileName = ftpConfig.getFtpFileName(); if (StringUtils.isNotBlank(fileName)) { if (fileNumber == 1) { - fileName = handlerSingleFile(tmpDataFileName); + fileName = handlerSingleFile(); } else { fileName = handlerMultiChannel(tmpDataFileName); } @@ -270,7 +264,7 @@ private String handleUserSpecificFileName(String tmpDataFileName, int fileNumber return fileName; } - private String handlerSingleFile(String tmpDataFileName) { + private String handlerSingleFile() { return ftpConfig.getFtpFileName(); } From 6ead749cd08e5841fdb14d1cf09d823c8616541c Mon Sep 17 00:00:00 2001 From: limengyao0809 Date: Tue, 6 Sep 2022 13:53:12 +0800 Subject: [PATCH 3/3] code format. --- .../chunjun/connector/ftp/handler/FtpHandler.java | 9 +++------ .../chunjun/connector/ftp/sink/FtpOutputFormat.java | 4 ++-- 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/handler/FtpHandler.java b/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/handler/FtpHandler.java index 4ac822b752..689a2443d2 100644 --- a/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/handler/FtpHandler.java +++ b/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/handler/FtpHandler.java @@ -189,7 +189,7 @@ private void listFiles(String path, List sources) throws IOException { // .和..是特殊文件 if (StringUtils.endsWith(ftpFile.getName(), ConstantValue.POINT_SYMBOL) || StringUtils.endsWith( - ftpFile.getName(), ConstantValue.TWO_POINT_SYMBOL)) { + ftpFile.getName(), ConstantValue.TWO_POINT_SYMBOL)) { continue; } sources.addAll(getFiles(path + ftpFile.getName(), ftpFile)); @@ -202,9 +202,7 @@ private void listFiles(String path, List sources) throws IOException { * * @param path path * @param file file - * * @return file list - * * @throws IOException io exception. */ private List getFiles(String path, FTPFile file) throws IOException { @@ -319,7 +317,7 @@ public void deleteAllFilesInDir(String dir, List exclude) { } if (StringUtils.endsWith(ftpFile.getName(), ConstantValue.POINT_SYMBOL) || StringUtils.endsWith( - ftpFile.getName(), ConstantValue.TWO_POINT_SYMBOL)) { + ftpFile.getName(), ConstantValue.TWO_POINT_SYMBOL)) { continue; } deleteAllFilesInDir(dir + ftpFile.getName(), exclude); @@ -382,7 +380,7 @@ public List listDirs(String path) { for (FTPFile ftpFile : ftpFiles) { if (StringUtils.endsWith(ftpFile.getName(), ConstantValue.POINT_SYMBOL) || StringUtils.endsWith( - ftpFile.getName(), ConstantValue.TWO_POINT_SYMBOL)) { + ftpFile.getName(), ConstantValue.TWO_POINT_SYMBOL)) { continue; } sources.add(path + ftpFile.getName()); @@ -428,7 +426,6 @@ public long getFileSize(String path) throws IOException { * 判断路径是否存在 * * @param path 判断的路径 - * * @return true 存在 false 不存在 */ private boolean isExist(String path) { diff --git a/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/sink/FtpOutputFormat.java b/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/sink/FtpOutputFormat.java index a82fe3b639..30eb1f27ba 100644 --- a/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/sink/FtpOutputFormat.java +++ b/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/sink/FtpOutputFormat.java @@ -27,10 +27,10 @@ import com.dtstack.chunjun.throwable.WriteRecordException; import com.dtstack.chunjun.util.ExceptionUtil; -import org.apache.commons.lang.StringUtils; - import org.apache.flink.table.data.RowData; +import org.apache.commons.lang.StringUtils; + import java.io.BufferedWriter; import java.io.File; import java.io.IOException;