Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,33 @@

package com.dtstack.chunjun.connector.ftp.client;

import com.dtstack.chunjun.connector.ftp.extend.ftp.FtpParseException;
import com.dtstack.chunjun.connector.ftp.handler.Position;

/** return from ftpSeqBufferedReader contains line and position */
public class Data {
private String[] data;
private Position position;
private FtpParseException exception;

public Data(String[] data, Position position) {
this.data = data;
this.position = position;
}

public Data(String[] data, Position position, FtpParseException exception) {
this.data = data;
this.position = position;
this.exception = exception;
}

public String[] getData() {
return data;
}

public FtpParseException getException() {
return exception;
}

public Position getPosition() {
return position;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,146 +18,14 @@

package com.dtstack.chunjun.connector.ftp.client;

import com.dtstack.chunjun.connector.ftp.conf.FtpConfig;
import com.dtstack.chunjun.connector.ftp.handler.FtpHandler;
import com.dtstack.chunjun.connector.ftp.handler.IFtpHandler;
import com.dtstack.chunjun.connector.ftp.source.FtpFileSplit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.util.List;
import java.util.Locale;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;

public class FileUtil {
private static final Logger LOG = LoggerFactory.getLogger(FileUtil.class);

public static void addCompressFile(
IFtpHandler ftpHandler,
String filePath,
FtpConfig ftpConfig,
List<FtpFileSplit> fileList)
throws IOException {
if ("ZIP".equals(ftpConfig.getCompressType().toUpperCase(Locale.ENGLISH))) {
try (ZipInputStream zipInputStream =
new ZipInputStream(
ftpHandler.getInputStream(filePath),
Charset.forName(ftpConfig.encoding))) {
ZipEntry zipEntry;
while ((zipEntry = zipInputStream.getNextEntry()) != null) {
fileList.add(
new FtpFileSplit(
0,
zipEntry.getSize(),
filePath,
zipEntry.getName(),
ftpConfig.getCompressType()));
}
closeWithFtpHandler(ftpHandler, LOG);
}
} else {
throw new RuntimeException("not support compressType " + ftpConfig.getCompressType());
}
}

public static void closeWithFtpHandler(IFtpHandler ftpHandler, Logger log) {
if (ftpHandler instanceof FtpHandler) {
try {
((FtpHandler) ftpHandler).getFtpClient().completePendingCommand();
} catch (Exception e) {
log.warn("FTPClient completePendingCommand has error ->", e);
try {
ftpHandler.logoutFtpServer();
} catch (Exception exception) {
log.warn("FTPClient logout has error ->", exception);
}
}
}
}

public static String getFilename(String filepath) {
String[] paths = filepath.split("/");
return paths[paths.length - 1];
}

/** analyse file */
public static void addFile(
IFtpHandler ftpHandler,
String filePath,
FtpConfig ftpConfig,
List<FtpFileSplit> fileList)
throws Exception {
long maxFetchSize = ftpConfig.getMaxFetchSize();

// fetchSize should bigger than 1M
maxFetchSize = Math.max(maxFetchSize, 1024 * 1024);

long currentFileSize = ftpHandler.getFileSize(filePath);
int parallelism = ftpConfig.getParallelism();

String filename = getFilename(filePath);

// do not split excel
if (ftpConfig.getFileType() == null
|| ftpConfig.getFileType().equals("excel")
|| ftpConfig.getFileType().equals("custom")) {
FtpFileSplit ftpFileSplit = new FtpFileSplit(0, currentFileSize, filePath, filename);
fileList.add(ftpFileSplit);
return;
}

// split file
if (maxFetchSize < currentFileSize) {
int perSplit = Math.min((int) currentFileSize / parallelism, Integer.MAX_VALUE);
long startPosition = 0;
long endPosition = startPosition + perSplit;

while (endPosition <= currentFileSize) {
if (endPosition == currentFileSize) {
FtpFileSplit ftpFileSplit =
new FtpFileSplit(startPosition, endPosition, filePath, filename);
fileList.add(ftpFileSplit);
break;
}

InputStream input = ftpHandler.getInputStreamByPosition(filePath, endPosition);
char c = ' ';

while (c != '\n') {
c = (char) input.read();
endPosition += 1;
}
FtpFileSplit ftpFileSplit =
new FtpFileSplit(startPosition, endPosition, filePath, filename);
fileList.add(ftpFileSplit);

LOG.info(
String.format(
"build file split, filename: %s, startPosition: %d, endPosition: %d",
filePath, startPosition, endPosition));

startPosition = endPosition;
endPosition = startPosition + perSplit;
}

if (startPosition != currentFileSize) {
FtpFileSplit ftpFileSplit =
new FtpFileSplit(startPosition, currentFileSize, filePath, filename);
fileList.add(ftpFileSplit);

LOG.info(
String.format(
"build file split, filename: %s, startPosition: %d, endPosition: %d",
filePath, startPosition, currentFileSize));
}
} else {
FtpFileSplit ftpFileSplit = new FtpFileSplit(0, currentFileSize, filePath, filename);
fileList.add(ftpFileSplit);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,12 @@
import java.util.List;
import java.util.zip.ZipEntry;

/** zip文件流 如果fileNameList不为空 只会读取fileNameList里的文件* */
public class ZipInputStream extends InputStream {
private static final Logger LOG = LoggerFactory.getLogger(ZipInputStream.class);

private java.util.zip.ZipInputStream zipInputStream;
private final java.util.zip.ZipInputStream zipInputStream;
private final List<String> fileNameList;
private ZipEntry currentZipEntry;
private List<String> fileNameList;

public ZipInputStream(InputStream in) {
this.zipInputStream = new java.util.zip.ZipInputStream(in);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
public class ExcelReaderExecutor implements Runnable {

private final ExcelReader reader;
private final ExcelSubExceptionCarrier ec;
private ExcelSubExceptionCarrier ec;

public ExcelReaderExecutor(ExcelReader reader, ExcelSubExceptionCarrier ec) {
this.reader = reader;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package com.dtstack.chunjun.connector.ftp.conf;

import com.dtstack.chunjun.conf.BaseFileConf;
import com.dtstack.chunjun.connector.ftp.enums.FileType;
import com.dtstack.chunjun.constants.ConstantValue;

import java.util.Map;
Expand Down Expand Up @@ -47,6 +48,9 @@ public class FtpConfig extends BaseFileConf {

private String ftpFileName;

/** 批量写入数据太大,会导致ftp协议缓冲区报错, 批量写入默认值设置小点 */
private long nextCheckRows = 100;

public String encoding = "UTF-8";

/** 空值替换 */
Expand All @@ -58,12 +62,34 @@ public class FtpConfig extends BaseFileConf {
/** User defined format class name */
private String customFormatClassName;

/** User defined split class name */
private String customConcurrentFileSplitClassName;

/* 行分隔符 */
private String columnDelimiter = "\n";

/** Get the specified fileReadClient according to the filetype * */
public String fileType;
public String fileType = FileType.TXT.name();

/** 压缩格式 * */
public String compressType;

public String getColumnDelimiter() {
return columnDelimiter;
}

public void setColumnDelimiter(String columnDelimiter) {
this.columnDelimiter = columnDelimiter;
}

public String getCustomConcurrentFileSplitClassName() {
return customConcurrentFileSplitClassName;
}

public void setCustomConcurrentFileSplitClassName(String customConcurrentFileSplitClassName) {
this.customConcurrentFileSplitClassName = customConcurrentFileSplitClassName;
}

public String getCustomFormatClassName() {
return customFormatClassName;
}
Expand Down Expand Up @@ -238,6 +264,16 @@ public void setMaxFetchSize(long fetchSize) {
this.maxFetchSize = fetchSize;
}

@Override
public long getNextCheckRows() {
return nextCheckRows;
}

@Override
public void setNextCheckRows(long nextCheckRows) {
this.nextCheckRows = nextCheckRows;
}

public long getMaxFetchSize() {
return this.maxFetchSize;
}
Expand All @@ -259,10 +295,16 @@ public String toString() {
.add("listHiddenFiles=" + listHiddenFiles)
.add("maxFetchSize=" + maxFetchSize)
.add("ftpFileName='" + ftpFileName + "'")
.add("nextCheckRows=" + nextCheckRows)
.add("encoding='" + encoding + "'")
.add("nullIsReplacedWithValue=" + nullIsReplacedWithValue)
.add("fileConfig=" + fileConfig)
.add("customFormatClassName='" + customFormatClassName + "'")
.add(
"customConcurrentFileSplitClassName='"
+ customConcurrentFileSplitClassName
+ "'")
.add("columnDelimiter='" + columnDelimiter + "'")
.add("fileType='" + fileType + "'")
.add("compressType='" + compressType + "'")
.toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package com.dtstack.chunjun.connector.ftp.converter;

import com.dtstack.chunjun.conf.FieldConf;
import com.dtstack.chunjun.connector.ftp.conf.FtpConfig;
import com.dtstack.chunjun.converter.AbstractRowConverter;
import com.dtstack.chunjun.converter.IDeserializationConverter;
Expand Down Expand Up @@ -47,26 +46,21 @@
import java.util.ArrayList;
import java.util.List;

/**
* @program chunjun
* @author: xiuzhu
* @create: 2021/06/19
*/
public class FtpColumnConverter extends AbstractRowConverter<RowData, RowData, String, FieldConf> {
public class FtpColumnConverter
extends AbstractRowConverter<RowData, RowData, String, LogicalType> {

private final FtpConfig ftpConfig;

public FtpColumnConverter(RowType rowType, FtpConfig ftpConfig) {
super(rowType);
super(rowType, ftpConfig);
this.ftpConfig = ftpConfig;
for (int i = 0; i < rowType.getFieldCount(); i++) {
FieldConf fieldConf = ftpConfig.getColumn().get(i);
toInternalConverters.add(
wrapIntoNullableInternalConverter(
createInternalConverter(rowType.getTypeAt(i))));
toExternalConverters.add(
wrapIntoNullableExternalConverter(
createExternalConverter(fieldConf), fieldConf));
createExternalConverter(rowType.getTypeAt(i)), rowType.getTypeAt(i)));
}
}

Expand Down Expand Up @@ -96,7 +90,7 @@ public String toExternal(RowData rowData, String output) throws Exception {
StringBuilder sb = new StringBuilder(128);

List<String> columnData = new ArrayList<>(ftpConfig.getColumn().size());
for (int index = 0; index < rowData.getArity(); index++) {
for (int index = 0; index < toExternalConverters.size(); index++) {
toExternalConverters.get(index).serialize(rowData, index, columnData);
if (index != 0) {
sb.append(ftpConfig.getFieldDelimiter());
Expand All @@ -109,7 +103,7 @@ public String toExternal(RowData rowData, String output) throws Exception {
@Override
@SuppressWarnings("unchecked")
protected ISerializationConverter<List<String>> wrapIntoNullableExternalConverter(
ISerializationConverter serializationConverter, FieldConf fieldConf) {
ISerializationConverter serializationConverter, LogicalType logicalType) {
return (rowData, index, list) -> {
if (rowData == null || rowData.isNullAt(index)) {
list.add(index, null);
Expand All @@ -119,6 +113,7 @@ protected ISerializationConverter<List<String>> wrapIntoNullableExternalConverte
};
}

@Override
protected IDeserializationConverter createInternalConverter(LogicalType type) {
switch (type.getTypeRoot()) {
case BOOLEAN:
Expand Down Expand Up @@ -159,8 +154,27 @@ protected IDeserializationConverter createInternalConverter(LogicalType type) {
}

@Override
protected ISerializationConverter<List<String>> createExternalConverter(FieldConf fieldConf) {
return (rowData, index, list) ->
list.add(index, ((ColumnRowData) rowData).getField(index).asString());
protected ISerializationConverter<List<String>> createExternalConverter(
LogicalType logicalType) {
switch (logicalType.getTypeRoot()) {
case DATE:
return (rowData, index, list) -> {
if (rowData instanceof ColumnRowData) {
list.add(
index,
((ColumnRowData) rowData).getField(index).asSqlDate().toString());
} else {
list.add(index, ((GenericRowData) rowData).getField(index).toString());
}
};
default:
return (rowData, index, list) -> {
if (rowData instanceof ColumnRowData) {
list.add(index, ((ColumnRowData) rowData).getField(index).asString());
} else {
list.add(index, ((GenericRowData) rowData).getField(index).toString());
}
};
}
}
}
Loading