diff --git a/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/table/FtpDynamicTableFactory.java b/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/table/FtpDynamicTableFactory.java index dd165dd458..5f42828d42 100644 --- a/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/table/FtpDynamicTableFactory.java +++ b/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/table/FtpDynamicTableFactory.java @@ -18,6 +18,7 @@ package com.dtstack.chunjun.connector.ftp.table; +import com.dtstack.chunjun.conf.FieldConf; import com.dtstack.chunjun.connector.ftp.conf.FtpConfig; import com.dtstack.chunjun.connector.ftp.options.FtpOptions; import com.dtstack.chunjun.connector.ftp.sink.FtpDynamicTableSink; @@ -38,9 +39,12 @@ import org.apache.flink.table.factories.DynamicTableSourceFactory; import org.apache.flink.table.factories.FactoryUtil; import org.apache.flink.table.factories.SerializationFormatFactory; +import org.apache.flink.table.types.logical.RowType; import org.apache.flink.table.utils.TableSchemaUtils; +import java.util.ArrayList; import java.util.HashSet; +import java.util.List; import java.util.Set; /** @@ -94,7 +98,18 @@ public DynamicTableSource createDynamicTableSource(Context context) { helper.discoverDecodingFormat( DeserializationFormatFactory.class, FtpOptions.FORMAT); + RowType rowType = (RowType) physicalSchema.toRowDataType().getLogicalType(); FtpConfig ftpConfig = getFtpConfByOptions(config); + String[] fieldNames = physicalSchema.getFieldNames(); + List columnList = new ArrayList<>(fieldNames.length); + for (int i = 0; i < fieldNames.length; i++) { + FieldConf field = new FieldConf(); + field.setName(fieldNames[i]); + field.setType(rowType.getTypeAt(i).asSummaryString()); + field.setIndex(i); + columnList.add(field); + } + ftpConfig.setColumn(columnList); return new FtpDynamicTableSource(physicalSchema, ftpConfig, decodingFormat); }