From f5efa902d15c316be379a81338b4a0f2a492e7eb Mon Sep 17 00:00:00 2001 From: kungen Date: Mon, 1 Aug 2022 10:03:31 +0800 Subject: [PATCH 1/2] [docs][ftp] add ftp docs --- .../ftp/ftp-sink.md" | 256 +++++++++++++++++ .../ftp/ftp-source.md" | 268 ++++++++++++++++++ 2 files changed, 524 insertions(+) create mode 100644 "docs_zh/ChunJun\350\277\236\346\216\245\345\231\250/ftp/ftp-sink.md" create mode 100644 "docs_zh/ChunJun\350\277\236\346\216\245\345\231\250/ftp/ftp-source.md" diff --git "a/docs_zh/ChunJun\350\277\236\346\216\245\345\231\250/ftp/ftp-sink.md" "b/docs_zh/ChunJun\350\277\236\346\216\245\345\231\250/ftp/ftp-sink.md" new file mode 100644 index 0000000000..792874b5f4 --- /dev/null +++ "b/docs_zh/ChunJun\350\277\236\346\216\245\345\231\250/ftp/ftp-sink.md" @@ -0,0 +1,256 @@ +# Ftp Sink + +## 一、介绍 +ftp sink + +## 二、数据源配置 +FTP服务搭建: + + +windows:[地址](https://help.aliyun.com/document_detail/92046.html?spm=a2c4g.11186623.6.1185.6371dcd5DOfc5z) + + +linux:[地址](https://help.aliyun.com/document_detail/92048.html?spm=a2c4g.11186623.6.1184.7a9a2dbcRLDNlf) + + +sftp服务搭建: + + +windows:[地址](http://www.freesshd.com/) + + +linux:[地址](https://yq.aliyun.com/articles/435356?spm=a2c4e.11163080.searchblog.102.576f2ec1BVgWY7) +## 三、插件名称 +| sync | ftpsink, ftpwriter | +| --- | --- | +| sql | ftp-x | + + +## 四、参数说明 + +### 1、sync + +- **path** + - 描述:数据文件路径 + - 必选:是 + - 字段类型:String + - 默认值:无 + + +- **protocol** + - 描述:服务器访问协议,目前支持ftp、sftp + - 必选:是 + - 字段类型:String + - 默认值:无 + + +- **host** + - 描述:ftp服务器地址 + - 必选:是 + - 字段类型:String + - 默认值:无 + + +- **port** + - 描述:ftp服务器端口 + - 必选:否 + - 字段类型:int + - 默认值:若传输协议是sftp协议,默认值是22;若传输协议是标准ftp协议,默认值是21 + + +- **username** + - 描述:ftp服务器登陆用户名 + - 必选:是 + - 字段类型:String + - 默认值:无 + + +- **password** + - 描述:ftp服务器登陆密码 + - 必选:否 + - 字段类型:String + - 默认值:无 + + +- **privateKeyPath** + - 描述:sftp私钥文件路径 + - 必选:否 + - 字段类型:String + - 默认值:无 + + +- **connectPattern** + - 描述:protocol为ftp时的连接模式,可选PASV和PORT,参数含义可参考:[模式说明](https://blog.csdn.net/qq_16038125/article/details/72851142) + - 必选:否 + - 字段类型:String + - 默认值:PASV + + +- **fieldDelimiter** + - 描述:读取的字段分隔符 + - 必选:否 + - 字段类型:String + - 默认值:',' + + +- **ftpFileName** + - 描述:远程FTP文件系统的文件名,只能写一个文件名。如果配置多并发度(channel > 1),或者文件的大小超过 maxFileSize 配置的值,那么文件会被命名成 filename_0_0, filename_0_1,...,filename_1_0,filename_1_1 。第一个数字是 channel 的编号,第二个数字是拆分后的文件编号。 + - 必选:否 + - 字段类型:String + - 默认值:无 + + +- **timeout** + - 描述:连接超时时间,单位毫秒 + - 必选:否 + - 字段类型:int + - 默认值:5000 + + +- **maxFileSize** + - 描述:写入ftp单个文件最大大小,单位字节 + - 必须:否 + - 字段类型:long + - 默认值:1073741824(1G) + + +- **writeMode** + - 描述:ftpwriter写入前数据清理处理模式 + - append:追加 + - overwrite:覆盖 + - 注意:追加写入是指在目录下追加写入一个文件a,并不是往某个文件追加写入内容;覆盖写入是指覆盖目录下所有文件,overwrite模式时会删除ftp当前目录下的所有文件 + - 必选:否 + - 字段类型:string + - 默认值:append + + +- **column** + - 描述:需要读取的字段 + - 注意:不支持*格式 + - 格式: + ```json + "column": [{ + "name": "col", + "type": "string", + "index":1, + "isPart":false, + "format": "yyyy-MM-dd hh:mm:ss", + "value": "value" + }] + ``` + - 属性说明: + - name:必选,字段名称 + - type:必选,字段类型,需要和数据文件中实际的字段类型匹配 + - index:非必选,字段在所有字段中的位置索引,从0开始计算,默认为-1,按照数组顺序依次读取,配置后读取指定字段列 + - isPart:非必选,是否是分区字段,如果是分区字段,会自动从path上截取分区赋值,默认为fale + - format:非必选,按照指定格式,格式化日期 + - value:非必选,常量字段,将value的值作为常量列返回 + - 必选:是 + - 参数类型:数组 + - 默认值:无 + + +### 2、sql + +- **connector** + - 描述:ftp-x + - 必选:是 + - 字段类型:String + - 默认值:无 + + +- **path** + - 描述:文件路径 + - 必选:是 + - 字段类型:String + - 默认值:无 + + +- **file-name** + - 描述:文件名,如果不指定则随机生成 + - 必选:否 + - 字段类型:String + - 默认值:无 + + +- **protocol** + - 描述:服务器访问协议,目前支持ftp、sftp + - 必选:是 + - 字段类型:String + - 默认值:无 + + +- **host** + - 描述:服务地地址 + - 必选:是 + - 字段类型:String + - 默认值:无 + + +- **port** + - 描述:ftp服务器端口 + - 必选:否 + - 字段类型:int + - 默认值:若传输协议是sftp协议,默认值是22;若传输协议是标准ftp协议,默认值是21 + + +- **username** + - 描述:服务器登陆用户名 + - 必选:是 + - 字段类型:String + - 默认值:无 + + +- **password** + - 描述:服务器登陆密码 + - 必选:否 + - 字段类型:String + - 默认值:无 + + +- **format** + - 描述:文件的类型,和原生flink保持一致,支持原生所有类型 + - 必选:否 + - 参数类型:string + - 默认值:csv + + +- **connect-pattern** + - 描述:protocol为ftp时的连接模式,可选PASV和PORT + - 必选:否 + - 字段类型:String + - 默认值:PASV + + +- **timeout** + - 描述:连接超时时间,单位毫秒 + - 必选:否 + - 字段类型:String + - 默认值:5000 + + +- **write-mode** + - 描述:ftpwriter写入前数据清理处理模式 + - append:追加 + - overwrite:覆盖 + - 注意:追加写入是指在目录下追加写入一个文件a,并不是往某个文件追加写入内容;覆盖写入是指覆盖目录下所有文件,overwrite模式时会删除ftp当前目录下的所有文件 + - 必选:否 + - 字段类型:string + - 默认值:append + + +- **max-file-size** + - 描述:写入ftp单个文件最大大小,单位字节 + - 必须:否 + - 字段类型:long + - 默认值:1073741824(1G) + + +## 五、数据类型 +|支持的类型|BOOLEAN, TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE, DECIMAL, STRING, VARCHAR, CHAR, BINARY, TIMESTAMP, DATETIME, TIME, DATE| +| --- | --- | +|不支持的类型|ARRAY, MAP, STRUCT| + + +## 六、脚本示例 +见项目内`chunjun-examples`文件夹 diff --git "a/docs_zh/ChunJun\350\277\236\346\216\245\345\231\250/ftp/ftp-source.md" "b/docs_zh/ChunJun\350\277\236\346\216\245\345\231\250/ftp/ftp-source.md" new file mode 100644 index 0000000000..15a1832621 --- /dev/null +++ "b/docs_zh/ChunJun\350\277\236\346\216\245\345\231\250/ftp/ftp-source.md" @@ -0,0 +1,268 @@ +# Ftp Source + +## 一、介绍 +ftp source + +## 二、数据源配置 +FTP服务搭建: + + +windows:[地址](https://help.aliyun.com/document_detail/92046.html?spm=a2c4g.11186623.6.1185.6371dcd5DOfc5z) + + +linux:[地址](https://help.aliyun.com/document_detail/92048.html?spm=a2c4g.11186623.6.1184.7a9a2dbcRLDNlf) + + +sftp服务搭建: + + +windows:[地址](http://www.freesshd.com/) + + +linux:[地址](https://yq.aliyun.com/articles/435356?spm=a2c4e.11163080.searchblog.102.576f2ec1BVgWY7) + +## 三、插件名称 +| sync | ftpsource, ftpreader | +| --- | --- | +| sql | ftp-x | + +## 四、参数说明 + +### 1、sync + +- **path** + - 描述:数据文件路径 + - 必选:是 + - 字段类型:String + - 默认值:无 + + +- **protocol** + - 描述:服务器访问协议,目前支持ftp、sftp + - 必选:是 + - 字段类型:String + - 默认值:无 + + +- **host** + - 描述:ftp服务器地址 + - 必选:是 + - 字段类型:String + - 默认值:无 + + +- **port** + - 描述:ftp服务器端口 + - 必选:否 + - 字段类型:int + - 默认值:若传输协议是sftp协议,默认值是22;若传输协议是标准ftp协议,默认值是21 + + +- **username** + - 描述:ftp服务器登陆用户名 + - 必选:是 + - 字段类型:String + - 默认值:无 + + +- **password** + - 描述:ftp服务器登陆密码 + - 必选:否 + - 字段类型:String + - 默认值:无 + + +- **privateKeyPath** + - 描述:sftp私钥文件路径 + - 必选:否 + - 字段类型:String + - 默认值:无 + + +- **connectPattern** + - 描述:protocol为ftp时的连接模式,可选PASV和PORT,参数含义可参考:[模式说明](https://blog.csdn.net/qq_16038125/article/details/72851142) + - 必选:否 + - 字段类型:String + - 默认值:PASV + + +- **fieldDelimiter** + - 描述:读取的字段分隔符 + - 必选:否 + - 字段类型:String + - 默认值:, + + +- **encoding** + - 描述:读取文件的编码配置 + - 必选:否 + - 字段类型:String + - 默认值:UTF-8 + + +- **controlEncoding** + - 描述:FTP客户端编码格式,当客户端和服务器编码格式不相同时使用 + - 必选:否 + - 字段类型:String + - 默认值:UTF-8 + + +- **isFirstLineHeader** + - 描述:首行是否为标题行,如果是则不读取第一行 + - 必选:否 + - 字段类型:boolean + - 默认值:false + + +- **timeout** + - 描述:连接超时时间,单位毫秒 + - 必选:否 + - 字段类型:String + - 默认值:5000 + + +- **column** + - 描述:需要读取的字段 + - 注意:不支持*格式 + - 格式: + ```json + "column": [{ + "name": "col", + "type": "string", + "index":1, + "isPart":false, + "format": "yyyy-MM-dd hh:mm:ss", + "value": "value" + }] + ``` + + - 属性说明: + - name:必选,字段名称 + - type:必选,字段类型,需要和数据文件中实际的字段类型匹配 + - index:非必选,字段在所有字段中的位置索引,从0开始计算,默认为-1,按照数组顺序依次读取,配置后读取指定字段列 + - isPart:非必选,是否是分区字段,如果是分区字段,会自动从path上截取分区赋值,默认为fale + - format:非必选,按照指定格式,格式化日期 + - value:非必选,常量字段,将value的值作为常量列返回 + - 必选:是 + - 参数类型:数组 + - 默认值:无 + + +- **fileType** + - 描述:读取的文件类型,默认取文件后缀名,支持CSV,TXT,EXCEL + - 必选:否 + - 字段类型:string + - 默认值:无 + + +- **compressType** + - 描述:文件压缩类型,目前只支持ZIP压缩格式 + - 必选:否 + - 字段类型:string + - 默认值:无 + + +- **listHiddenFiles** + - 描述:是否展示隐藏文件 + - 必选:否 + - 字段类型:boolean + - 默认值:true + + +- **nullIsReplacedWithValue** + - 描述:当某个字段出现空值时替换 + - 必选:否 + - 字段类型:Object + - 默认值:null + + +- **fileConfig** + - 描述:文件参数配置 + - 必选:否 + - 字段类型:Map + - 默认值:无 + - 示例: + - csv文件是否进行trim:`"fileConfig":{"trimWhitespace":true}` + + +#### 2、sql + +- **connector** + - 描述:ftp-x + - 必选:是 + - 字段类型:String + - 默认值:无 + + +- **path** + - 描述:文件路径 + - 必选:是 + - 字段类型:String + - 默认值:无 + + +- **protocol** + - 描述:服务器访问协议,目前支持ftp、sftp + - 必选:是 + - 字段类型:String + - 默认值:无 + + +- **host** + - 描述:服务地地址 + - 必选:是 + - 字段类型:String + - 默认值:无 + + +- **port** + - 描述:ftp服务器端口 + - 必选:否 + - 字段类型:int + - 默认值:若传输协议是sftp协议,默认值是22;若传输协议是标准ftp协议,默认值是21 + + +- **username** + - 描述:服务器登陆用户名 + - 必选:是 + - 字段类型:String + - 默认值:无 + + +- **password** + - 描述:服务器登陆密码 + - 必选:否 + - 字段类型:String + - 默认值:无 + + +- **format** + - 描述:文件的类型,和原生flink保持一致,支持原生所有类型 + - 必选:否 + - 参数类型:string + - 默认值:csv + + +- **connect-pattern** + - 描述:protocol为ftp时的连接模式,可选PASV和PORT + - 必选:否 + - 字段类型:String + - 默认值:PASV + + +- **timeout** + - 描述:连接超时时间,单位毫秒 + - 必选:否 + - 字段类型:String + - 默认值:5000 + + +## 五、数据类型 +|支持的类型|BOOLEAN, TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE, DECIMAL, STRING, VARCHAR, CHAR, BINARY, TIMESTAMP, DATETIME, TIME, DATE| +| --- | --- | +|不支持的类型|ARRAY, MAP, STRUCT| + + + +## 六、配置示例 +见项目内`chunjun-examples`文件夹 From 86d0c82ca13f56a2143d5db2479b05516582127d Mon Sep 17 00:00:00 2001 From: kungen Date: Sun, 9 Oct 2022 16:47:57 +0800 Subject: [PATCH 2/2] [feat-1295][e2e]add ftp container --- .../containers/ftp/SftpContainer.java | 58 ++++++ .../standalone/ftp/SftpSyncE2eITCase.java | 80 +++++++ .../src/test/resources/docker/ftp/Dockerfile | 18 ++ .../src/test/resources/docker/ftp/test.csv | 20 ++ chunjun-examples/json/ftp/ftp_ftp.json | 195 ------------------ chunjun-examples/json/ftp/ftp_stream.json | 74 +++++++ 6 files changed, 250 insertions(+), 195 deletions(-) create mode 100644 chunjun-e2e/src/test/java/com/dtstack/chunjun/connector/containers/ftp/SftpContainer.java create mode 100644 chunjun-e2e/src/test/java/com/dtstack/chunjun/connector/test/standalone/ftp/SftpSyncE2eITCase.java create mode 100644 chunjun-e2e/src/test/resources/docker/ftp/Dockerfile create mode 100644 chunjun-e2e/src/test/resources/docker/ftp/test.csv delete mode 100644 chunjun-examples/json/ftp/ftp_ftp.json create mode 100644 chunjun-examples/json/ftp/ftp_stream.json diff --git a/chunjun-e2e/src/test/java/com/dtstack/chunjun/connector/containers/ftp/SftpContainer.java b/chunjun-e2e/src/test/java/com/dtstack/chunjun/connector/containers/ftp/SftpContainer.java new file mode 100644 index 0000000000..e8dd6c1c6b --- /dev/null +++ b/chunjun-e2e/src/test/java/com/dtstack/chunjun/connector/containers/ftp/SftpContainer.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.chunjun.connector.containers.ftp; + + +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.wait.strategy.WaitStrategy; +import org.testcontainers.containers.wait.strategy.WaitStrategyTarget; +import org.testcontainers.images.builder.ImageFromDockerfile; + +import java.net.URISyntaxException; +import java.net.URL; +import java.nio.file.Paths; +import java.time.Duration; + +public class SftpContainer extends GenericContainer { + + private static final URL SFTP_DOCKERFILE = + SftpContainer.class + .getClassLoader() + .getResource("docker/ftp/Dockerfile"); + + public SftpContainer(String imageName) throws URISyntaxException { + super( + new ImageFromDockerfile(imageName, true) + .withDockerfile(Paths.get(SFTP_DOCKERFILE.toURI())) + ); + + waitingFor( + new WaitStrategy() { + @Override + public void waitUntilReady(WaitStrategyTarget waitStrategyTarget) { + } + + @Override + public WaitStrategy withStartupTimeout(Duration startupTimeout) { + return null; + } + }); + } + +} diff --git a/chunjun-e2e/src/test/java/com/dtstack/chunjun/connector/test/standalone/ftp/SftpSyncE2eITCase.java b/chunjun-e2e/src/test/java/com/dtstack/chunjun/connector/test/standalone/ftp/SftpSyncE2eITCase.java new file mode 100644 index 0000000000..9c7e74ac07 --- /dev/null +++ b/chunjun-e2e/src/test/java/com/dtstack/chunjun/connector/test/standalone/ftp/SftpSyncE2eITCase.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.chunjun.connector.test.standalone.ftp; + +import com.dtstack.chunjun.connector.containers.ftp.SftpContainer; +import com.dtstack.chunjun.connector.entity.JobAccumulatorResult; +import com.dtstack.chunjun.connector.test.utils.ChunjunFlinkStandaloneTestEnvironment; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.lifecycle.Startables; + +import java.net.URISyntaxException; +import java.time.Duration; +import java.util.stream.Stream; + +public class SftpSyncE2eITCase extends ChunjunFlinkStandaloneTestEnvironment { + private static final Logger LOG = LoggerFactory.getLogger(SftpSyncE2eITCase.class); + + protected static final String sftpImageName = "ftp-e2e-stream"; + + protected SftpContainer sftpContainer; + + private void initContainer() throws URISyntaxException { + sftpContainer = new SftpContainer(sftpImageName); + sftpContainer + .withNetwork(NETWORK) + .withNetworkAliases(sftpImageName) + .withLogConsumer(new Slf4jLogConsumer(LOG)) + .dependsOn(flinkStandaloneContainer); + } + + @Before + public void before() throws Exception { + super.before(); + LOG.info("Starting sftp containers..."); + initContainer(); + Startables.deepStart(Stream.of(sftpContainer)).join(); + Thread.sleep(5000); + LOG.info("sftp Containers are started."); + } + + @After + public void after() { + super.after(); + if (sftpContainer != null) { + sftpContainer.stop(); + } + } + + @Test + public void testFtpToStream() throws Exception { + submitSyncJobOnStandLone( + ChunjunFlinkStandaloneTestEnvironment.CHUNJUN_HOME + + "/chunjun-examples/json/ftp/ftp_stream.json"); + JobAccumulatorResult jobAccumulatorResult = waitUntilJobFinished(Duration.ofMinutes(30)); + Assert.assertEquals(jobAccumulatorResult.getNumRead(), 20); + } +} diff --git a/chunjun-e2e/src/test/resources/docker/ftp/Dockerfile b/chunjun-e2e/src/test/resources/docker/ftp/Dockerfile new file mode 100644 index 0000000000..454a8e24a7 --- /dev/null +++ b/chunjun-e2e/src/test/resources/docker/ftp/Dockerfile @@ -0,0 +1,18 @@ +FROM ubuntu:16.04 + +ADD test.csv /root/ + +RUN apt-get update && apt-get install -y openssh-server +RUN mkdir /var/run/sshd +# Password & Authentication +RUN echo 'root:admin123' | chpasswd +RUN sed -i 's/PermitRootLogin prohibit-password/PermitRootLogin yes/' /etc/ssh/sshd_config + +# SSH & Keeping Session Alive +RUN sed 's@session\s*required\s*pam_loginuid.so@session optional pam_loginuid.so@g' -i /etc/pam.d/sshd + +ENV NOTVISIBLE "in users profile" +RUN echo "export VISIBLE=now" >> /etc/profile + +EXPOSE 22 +CMD ["/usr/sbin/sshd", "-D"] diff --git a/chunjun-e2e/src/test/resources/docker/ftp/test.csv b/chunjun-e2e/src/test/resources/docker/ftp/test.csv new file mode 100644 index 0000000000..5453e1f38e --- /dev/null +++ b/chunjun-e2e/src/test/resources/docker/ftp/test.csv @@ -0,0 +1,20 @@ +1,1 +1,1 +1,1 +1,1 +1,1 +1,1 +1,1 +1,1 +1,1 +1,1 +1,1 +1,1 +1,1 +1,1 +1,1 +1,1 +1,1 +1,1 +1,1 +1,1 diff --git a/chunjun-examples/json/ftp/ftp_ftp.json b/chunjun-examples/json/ftp/ftp_ftp.json deleted file mode 100644 index ea145bccbf..0000000000 --- a/chunjun-examples/json/ftp/ftp_ftp.json +++ /dev/null @@ -1,195 +0,0 @@ -{ - "job":{ - "content":[ - { - "reader":{ - "parameter":{ - "path":"/data/sftp/chunjun/source/data.csv", - "protocol":"sftp", - "port":22, - "isFirstLineHeader":false, - "host":"localhost", - "column":[ - { - "index":0, - "type":"int", - "name": "c_id" - }, - { - "index":1, - "type":"boolean", - "name": "c_boolean" - }, - { - "index":2, - "type":"tinyint", - "name": "c_tinyint" - }, - { - "index":3, - "type":"smallint", - "name": "c_smallint" - }, - { - "index":4, - "type":"int", - "name": "c_int" - }, - { - "index":5, - "type":"bigint", - "name": "c_bigint" - }, - { - "index":6, - "type":"float", - "name": "c_float" - }, - { - "index":7, - "type":"double", - "name": "c_double" - }, - { - "index":8, - "type":"decimal", - "name": "c_decimal" - }, - { - "index":9, - "type":"string", - "name": "c_string" - }, - { - "index":10, - "type":"varchar", - "name": "c_varchar" - }, - { - "index":11, - "type":"char", - "name": "c_char" - }, - { - "index":12, - "type":"timestamp", - "name": "c_timestamp" - }, - { - "index":13, - "type":"date", - "name": "c_date" - } - ], - "password":"xxxxxx", - "fieldDelimiter":",", - "encoding":"utf-8", - "username":"root" - }, - "name":"ftpreader" - }, - "writer":{ - "parameter": { - "path": "/data/sftp/chunjun/sink", - "protocol": "sftp", - "port": 22, - "writeMode": "append", - "host": "localhost", - "column":[ - { - "index":0, - "type":"int", - "name": "c_id" - }, - { - "index":1, - "type":"boolean", - "name": "c_boolean" - }, - { - "index":2, - "type":"tinyint", - "name": "c_tinyint" - }, - { - "index":3, - "type":"smallint", - "name": "c_smallint" - }, - { - "index":4, - "type":"int", - "name": "c_int" - }, - { - "index":5, - "type":"bigint", - "name": "c_bigint" - }, - { - "index":6, - "type":"float", - "name": "c_float" - }, - { - "index":7, - "type":"double", - "name": "c_double" - }, - { - "index":8, - "type":"decimal", - "name": "c_decimal" - }, - { - "index":9, - "type":"string", - "name": "c_string" - }, - { - "index":10, - "type":"varchar", - "name": "c_varchar" - }, - { - "index":11, - "type":"char", - "name": "c_char" - }, - { - "index":12, - "type":"timestamp", - "name": "c_timestamp" - }, - { - "index":13, - "type":"date", - "name": "c_date" - } - ], - "password": "xxxxxx", - "fieldDelimiter": ",", - "encoding": "utf-8", - "username": "root" - }, - "name":"ftpwriter" - } - } - ], - "setting":{ - "restore":{ - "maxRowNumForCheckpoint":0, - "isRestore":false, - "restoreColumnName":"", - "restoreColumnIndex":0 - }, - "errorLimit":{ - "record":100 - }, - "speed":{ - "bytes":0, - "channel":1 - } - } - } -} diff --git a/chunjun-examples/json/ftp/ftp_stream.json b/chunjun-examples/json/ftp/ftp_stream.json new file mode 100644 index 0000000000..8a15ae70c0 --- /dev/null +++ b/chunjun-examples/json/ftp/ftp_stream.json @@ -0,0 +1,74 @@ +{ + "job": { + "content": [ + { + "reader": { + "parameter": { + "isFirstLineHeader": false, + "column": [ + { + "customConverterType": "INT", + "index": 0, + "resourceName": "", + "type": "INT", + "key": 0 + }, + { + "customConverterType": "INT", + "index": 1, + "resourceName": "", + "type": "INT", + "key": 1 + } + ], + "resourceName": "", + "fieldDelimiter": ",", + "encoding": "utf-8", + "path": "/root/test.csv", + "protocol": "SFTP", + "password": "admin123", + "port": 22, + "host": "ftp-e2e-stream", + "fileType": "CSV", + "username": "root" + }, + "name": "ftpreader" + }, + "writer": { + "parameter": { + "column": [ + { + "name": "id", + "type": "id" + }, + { + "name": "id2", + "type": "int" + } + ], + "print": true + }, + "table": { + "tableName": "sinkTable" + }, + "name": "streamwriter" + } + } + ], + "setting": { + "restore": { + "maxRowNumForCheckpoint": 0, + "isRestore": false, + "restoreColumnName": "", + "restoreColumnIndex": 0 + }, + "errorLimit": { + "record": 100 + }, + "speed": { + "bytes": 0, + "channel": 1 + } + } + } +}