Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ public class FtpHandler implements IFtpHandler {

private static final Logger LOG = LoggerFactory.getLogger(FtpHandler.class);

private static final String DISCONNECT_FAIL_MESSAGE = "Failed to disconnect from ftp server";
private static final String SP = "/";
private FTPClient ftpClient = null;
private String controlEncoding;
Expand Down Expand Up @@ -175,32 +174,36 @@ public List<String> getFiles(String path) {
path = path + SP;
}
try {
FTPFile[] ftpFiles = ftpClient.listFiles(encodePath(path));
if (ftpFiles != null) {
for (FTPFile ftpFile : ftpFiles) {
// .和..是特殊文件
if (StringUtils.endsWith(ftpFile.getName(), ConstantValue.POINT_SYMBOL)
|| StringUtils.endsWith(
ftpFile.getName(), ConstantValue.TWO_POINT_SYMBOL)) {
continue;
}
sources.addAll(getFiles(path + ftpFile.getName(), ftpFile));
}
}
listFiles(path, sources);
} catch (IOException e) {
LOG.error("", e);
throw new RuntimeException(e);
}
return sources;
}

private void listFiles(String path, List<String> sources) throws IOException {
FTPFile[] ftpFiles = ftpClient.listFiles(encodePath(path));
if (ftpFiles != null) {
for (FTPFile ftpFile : ftpFiles) {
// .和..是特殊文件
if (StringUtils.endsWith(ftpFile.getName(), ConstantValue.POINT_SYMBOL)
|| StringUtils.endsWith(
ftpFile.getName(), ConstantValue.TWO_POINT_SYMBOL)) {
continue;
}
sources.addAll(getFiles(path + ftpFile.getName(), ftpFile));
}
}
}

/**
* 递归获取指定路径下的所有文件(暂无过滤)
*
* @param path
* @param file
* @return
* @throws IOException
* @param path path
* @param file file
* @return file list
* @throws IOException io exception.
*/
private List<String> getFiles(String path, FTPFile file) throws IOException {
List<String> sources = new ArrayList<>();
Expand All @@ -209,17 +212,7 @@ private List<String> getFiles(String path, FTPFile file) throws IOException {
path = path + SP;
}
ftpClient.enterLocalPassiveMode();
FTPFile[] ftpFiles = ftpClient.listFiles(encodePath(path));
if (ftpFiles != null) {
for (FTPFile ftpFile : ftpFiles) {
if (StringUtils.endsWith(ftpFile.getName(), ConstantValue.POINT_SYMBOL)
|| StringUtils.endsWith(
ftpFile.getName(), ConstantValue.TWO_POINT_SYMBOL)) {
continue;
}
sources.addAll(getFiles(path + ftpFile.getName(), ftpFile));
}
}
listFiles(path, sources);
} else {
sources.add(path);
}
Expand Down Expand Up @@ -364,8 +357,7 @@ public boolean deleteFile(String filePath) throws IOException {
public InputStream getInputStream(String filePath) {
try {
ftpClient.enterLocalPassiveMode();
InputStream is = ftpClient.retrieveFileStream(encodePath(filePath));
return is;
return ftpClient.retrieveFileStream(encodePath(filePath));
} catch (IOException e) {
String message =
String.format("读取文件 : [%s] 时出错,请确认文件:[%s]存在且配置的用户有权限读取", filePath, filePath);
Expand Down Expand Up @@ -461,4 +453,9 @@ private boolean isExist(String path) {
private String encodePath(String path) throws UnsupportedEncodingException {
return new String(path.getBytes(controlEncoding), FTP.DEFAULT_CONTROL_ENCODING);
}

@Override
public void close() throws Exception {
logoutFtpServer();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
*
* @author huyifan.zju@163.com
*/
public interface IFtpHandler {
public interface IFtpHandler extends AutoCloseable {

/**
* 登录服务器
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ public class SftpHandler implements IFtpHandler {
private static final String DOT_DOT = "..";
private static final String SP = "/";
private static final String SRC_MAIN = "src/main";
private static String PATH_NOT_EXIST_ERR = "no such file";
private static String MSG_AUTH_FAIL = "Auth fail";
private static final String PATH_NOT_EXIST_ERR = "no such file";
private static final String MSG_AUTH_FAIL = "Auth fail";
private Session session = null;
private ChannelSftp channelSftp = null;

Expand Down Expand Up @@ -162,7 +162,7 @@ public boolean isDirExist(String directoryPath) {
SftpATTRS sftpAttrs = channelSftp.lstat(directoryPath);
return sftpAttrs.isDir();
} catch (SftpException e) {
if (e.getMessage().toLowerCase().equals(PATH_NOT_EXIST_ERR)) {
if (e.getMessage().equalsIgnoreCase(PATH_NOT_EXIST_ERR)) {
LOG.warn("{}", e.getMessage());
return false;
}
Expand All @@ -174,14 +174,13 @@ public boolean isDirExist(String directoryPath) {

@Override
public boolean isFileExist(String filePath) {
boolean isExitFlag = false;
try {
SftpATTRS sftpAttrs = channelSftp.lstat(filePath);
if (sftpAttrs.getSize() >= 0) {
isExitFlag = true;
return true;
}
} catch (SftpException e) {
if (e.getMessage().toLowerCase().equals(PATH_NOT_EXIST_ERR)) {
if (e.getMessage().equalsIgnoreCase(PATH_NOT_EXIST_ERR)) {
LOG.warn("{}", e.getMessage());
return false;
} else {
Expand All @@ -190,7 +189,7 @@ public boolean isFileExist(String filePath) {
throw new RuntimeException(message, e);
}
}
return isExitFlag;
return false;
}

@Override
Expand Down Expand Up @@ -223,9 +222,9 @@ public List<String> listDirs(String path) {
}

try {
Vector vector = channelSftp.ls(path);
for (int i = 0; i < vector.size(); ++i) {
ChannelSftp.LsEntry le = (ChannelSftp.LsEntry) vector.get(i);
Vector<?> vector = channelSftp.ls(path);
for (Object o : vector) {
ChannelSftp.LsEntry le = (ChannelSftp.LsEntry) o;
String strName = le.getFilename();
if (!strName.equals(DOT)
&& !strName.equals(SRC_MAIN)
Expand Down Expand Up @@ -256,9 +255,9 @@ public List<String> getFiles(String path) {
path = path + SP;
}
try {
Vector vector = channelSftp.ls(path);
for (int i = 0; i < vector.size(); ++i) {
ChannelSftp.LsEntry le = (ChannelSftp.LsEntry) vector.get(i);
Vector<?> vector = channelSftp.ls(path);
for (Object o : vector) {
ChannelSftp.LsEntry le = (ChannelSftp.LsEntry) o;
String strName = le.getFilename();
if (!strName.equals(DOT)
&& !strName.equals(SRC_MAIN)
Expand All @@ -283,14 +282,13 @@ public List<String> getFiles(String path) {
public void mkDirRecursive(String directoryPath) {
boolean isDirExist = false;
try {
this.printWorkingDirectory();
printWorkingDirectory();
SftpATTRS sftpAttrs = this.channelSftp.lstat(directoryPath);
isDirExist = sftpAttrs.isDir();
} catch (SftpException e) {
if (e.getMessage().toLowerCase().equals(PATH_NOT_EXIST_ERR)) {
if (e.getMessage().equalsIgnoreCase(PATH_NOT_EXIST_ERR)) {
LOG.warn("{}", e.getMessage());
LOG.warn("Path [{}] does not exist and will be created.", directoryPath);
isDirExist = false;
}
}
if (!isDirExist) {
Expand Down Expand Up @@ -352,9 +350,9 @@ public void deleteAllFilesInDir(String dir, List<String> exclude) {
}

try {
Vector vector = channelSftp.ls(dir);
for (int i = 0; i < vector.size(); ++i) {
ChannelSftp.LsEntry le = (ChannelSftp.LsEntry) vector.get(i);
Vector<?> vector = channelSftp.ls(dir);
for (Object o : vector) {
ChannelSftp.LsEntry le = (ChannelSftp.LsEntry) o;
String strName = le.getFilename();
if (CollectionUtils.isNotEmpty(exclude) && exclude.contains(strName)) {
continue;
Expand Down Expand Up @@ -397,23 +395,20 @@ public boolean deleteFile(String filePath) throws IOException {
return true;
}

public boolean mkDirSingleHierarchy(String directoryPath) throws SftpException {
boolean isDirExist = false;
public void mkDirSingleHierarchy(String directoryPath) throws SftpException {
boolean isDirExist;
try {
SftpATTRS sftpAttrs = this.channelSftp.lstat(directoryPath);
isDirExist = sftpAttrs.isDir();
} catch (SftpException e) {
if (!isDirExist) {
LOG.info("Creating a directory step by step [{}]", directoryPath);
this.channelSftp.mkdir(directoryPath);
return true;
}
LOG.info("Creating a directory step by step [{}]", directoryPath);
this.channelSftp.mkdir(directoryPath);
return;
}
if (!isDirExist) {
LOG.info("Creating a directory step by step [{}]", directoryPath);
this.channelSftp.mkdir(directoryPath);
}
return true;
}

@Override
Expand All @@ -433,4 +428,9 @@ public long getFileSize(String path) throws IOException {
throw new IOException(e);
}
}

@Override
public void close() throws Exception {
logoutFtpServer();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,6 @@ public class FtpOutputFormat extends BaseFileOutputFormat {
private static final int NEWLINE = 10;

protected FtpConfig ftpConfig;
protected List<String> columnTypes;

protected List<String> columnNames;

private transient IFtpHandler ftpHandler;

Expand Down Expand Up @@ -109,6 +106,7 @@ protected void nextBlock() {
}

@Override
@SuppressWarnings("unchecked")
public void writeSingleRecordToFile(RowData rowData) throws WriteRecordException {
try {
if (writer == null) {
Expand Down Expand Up @@ -241,7 +239,7 @@ protected long getCurrentFileSize() {

@Override
protected void writeMultipleRecordsInternal() {
notSupportBatchWrite("FtpWriter");
throw new UnsupportedOperationException("FtpWriter 不支持批量写入");
}

public FtpConfig getFtpConfig() {
Expand All @@ -252,15 +250,11 @@ public void setFtpConfig(FtpConfig ftpConfig) {
this.ftpConfig = ftpConfig;
}

protected void notSupportBatchWrite(String writerName) {
throw new UnsupportedOperationException(writerName + "不支持批量写入");
}

private String handleUserSpecificFileName(String tmpDataFileName, int fileNumber) {
String fileName = ftpConfig.getFtpFileName();
if (StringUtils.isNotBlank(fileName)) {
if (fileNumber == 1) {
fileName = handlerSingleFile(tmpDataFileName);
fileName = handlerSingleFile();
} else {
fileName = handlerMultiChannel(tmpDataFileName);
}
Expand All @@ -270,7 +264,7 @@ private String handleUserSpecificFileName(String tmpDataFileName, int fileNumber
return fileName;
}

private String handlerSingleFile(String tmpDataFileName) {
private String handlerSingleFile() {
return ftpConfig.getFtpFileName();
}

Expand Down
Loading