diff --git a/chunjun-e2e/src/test/java/com/dtstack/chunjun/connector/containers/ftp/SftpContainer.java b/chunjun-e2e/src/test/java/com/dtstack/chunjun/connector/containers/ftp/SftpContainer.java new file mode 100644 index 0000000000..e8dd6c1c6b --- /dev/null +++ b/chunjun-e2e/src/test/java/com/dtstack/chunjun/connector/containers/ftp/SftpContainer.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.chunjun.connector.containers.ftp; + + +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.wait.strategy.WaitStrategy; +import org.testcontainers.containers.wait.strategy.WaitStrategyTarget; +import org.testcontainers.images.builder.ImageFromDockerfile; + +import java.net.URISyntaxException; +import java.net.URL; +import java.nio.file.Paths; +import java.time.Duration; + +public class SftpContainer extends GenericContainer { + + private static final URL SFTP_DOCKERFILE = + SftpContainer.class + .getClassLoader() + .getResource("docker/ftp/Dockerfile"); + + public SftpContainer(String imageName) throws URISyntaxException { + super( + new ImageFromDockerfile(imageName, true) + .withDockerfile(Paths.get(SFTP_DOCKERFILE.toURI())) + ); + + waitingFor( + new WaitStrategy() { + @Override + public void waitUntilReady(WaitStrategyTarget waitStrategyTarget) { + } + + @Override + public WaitStrategy withStartupTimeout(Duration startupTimeout) { + return null; + } + }); + } + +} diff --git a/chunjun-e2e/src/test/java/com/dtstack/chunjun/connector/test/standalone/ftp/SftpSyncE2eITCase.java b/chunjun-e2e/src/test/java/com/dtstack/chunjun/connector/test/standalone/ftp/SftpSyncE2eITCase.java new file mode 100644 index 0000000000..9c7e74ac07 --- /dev/null +++ b/chunjun-e2e/src/test/java/com/dtstack/chunjun/connector/test/standalone/ftp/SftpSyncE2eITCase.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.chunjun.connector.test.standalone.ftp; + +import com.dtstack.chunjun.connector.containers.ftp.SftpContainer; +import com.dtstack.chunjun.connector.entity.JobAccumulatorResult; +import com.dtstack.chunjun.connector.test.utils.ChunjunFlinkStandaloneTestEnvironment; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.lifecycle.Startables; + +import java.net.URISyntaxException; +import java.time.Duration; +import java.util.stream.Stream; + +public class SftpSyncE2eITCase extends ChunjunFlinkStandaloneTestEnvironment { + private static final Logger LOG = LoggerFactory.getLogger(SftpSyncE2eITCase.class); + + protected static final String sftpImageName = "ftp-e2e-stream"; + + protected SftpContainer sftpContainer; + + private void initContainer() throws URISyntaxException { + sftpContainer = new SftpContainer(sftpImageName); + sftpContainer + .withNetwork(NETWORK) + .withNetworkAliases(sftpImageName) + .withLogConsumer(new Slf4jLogConsumer(LOG)) + .dependsOn(flinkStandaloneContainer); + } + + @Before + public void before() throws Exception { + super.before(); + LOG.info("Starting sftp containers..."); + initContainer(); + Startables.deepStart(Stream.of(sftpContainer)).join(); + Thread.sleep(5000); + LOG.info("sftp Containers are started."); + } + + @After + public void after() { + super.after(); + if (sftpContainer != null) { + sftpContainer.stop(); + } + } + + @Test + public void testFtpToStream() throws Exception { + submitSyncJobOnStandLone( + ChunjunFlinkStandaloneTestEnvironment.CHUNJUN_HOME + + "/chunjun-examples/json/ftp/ftp_stream.json"); + JobAccumulatorResult jobAccumulatorResult = waitUntilJobFinished(Duration.ofMinutes(30)); + Assert.assertEquals(jobAccumulatorResult.getNumRead(), 20); + } +} diff --git a/chunjun-e2e/src/test/resources/docker/ftp/Dockerfile b/chunjun-e2e/src/test/resources/docker/ftp/Dockerfile new file mode 100644 index 0000000000..454a8e24a7 --- /dev/null +++ b/chunjun-e2e/src/test/resources/docker/ftp/Dockerfile @@ -0,0 +1,18 @@ +FROM ubuntu:16.04 + +ADD test.csv /root/ + +RUN apt-get update && apt-get install -y openssh-server +RUN mkdir /var/run/sshd +# Password & Authentication +RUN echo 'root:admin123' | chpasswd +RUN sed -i 's/PermitRootLogin prohibit-password/PermitRootLogin yes/' /etc/ssh/sshd_config + +# SSH & Keeping Session Alive +RUN sed 's@session\s*required\s*pam_loginuid.so@session optional pam_loginuid.so@g' -i /etc/pam.d/sshd + +ENV NOTVISIBLE "in users profile" +RUN echo "export VISIBLE=now" >> /etc/profile + +EXPOSE 22 +CMD ["/usr/sbin/sshd", "-D"] diff --git a/chunjun-e2e/src/test/resources/docker/ftp/test.csv b/chunjun-e2e/src/test/resources/docker/ftp/test.csv new file mode 100644 index 0000000000..5453e1f38e --- /dev/null +++ b/chunjun-e2e/src/test/resources/docker/ftp/test.csv @@ -0,0 +1,20 @@ +1,1 +1,1 +1,1 +1,1 +1,1 +1,1 +1,1 +1,1 +1,1 +1,1 +1,1 +1,1 +1,1 +1,1 +1,1 +1,1 +1,1 +1,1 +1,1 +1,1 diff --git a/chunjun-examples/json/ftp/ftp_ftp.json b/chunjun-examples/json/ftp/ftp_ftp.json deleted file mode 100644 index ea145bccbf..0000000000 --- a/chunjun-examples/json/ftp/ftp_ftp.json +++ /dev/null @@ -1,195 +0,0 @@ -{ - "job":{ - "content":[ - { - "reader":{ - "parameter":{ - "path":"/data/sftp/chunjun/source/data.csv", - "protocol":"sftp", - "port":22, - "isFirstLineHeader":false, - "host":"localhost", - "column":[ - { - "index":0, - "type":"int", - "name": "c_id" - }, - { - "index":1, - "type":"boolean", - "name": "c_boolean" - }, - { - "index":2, - "type":"tinyint", - "name": "c_tinyint" - }, - { - "index":3, - "type":"smallint", - "name": "c_smallint" - }, - { - "index":4, - "type":"int", - "name": "c_int" - }, - { - "index":5, - "type":"bigint", - "name": "c_bigint" - }, - { - "index":6, - "type":"float", - "name": "c_float" - }, - { - "index":7, - "type":"double", - "name": "c_double" - }, - { - "index":8, - "type":"decimal", - "name": "c_decimal" - }, - { - "index":9, - "type":"string", - "name": "c_string" - }, - { - "index":10, - "type":"varchar", - "name": "c_varchar" - }, - { - "index":11, - "type":"char", - "name": "c_char" - }, - { - "index":12, - "type":"timestamp", - "name": "c_timestamp" - }, - { - "index":13, - "type":"date", - "name": "c_date" - } - ], - "password":"xxxxxx", - "fieldDelimiter":",", - "encoding":"utf-8", - "username":"root" - }, - "name":"ftpreader" - }, - "writer":{ - "parameter": { - "path": "/data/sftp/chunjun/sink", - "protocol": "sftp", - "port": 22, - "writeMode": "append", - "host": "localhost", - "column":[ - { - "index":0, - "type":"int", - "name": "c_id" - }, - { - "index":1, - "type":"boolean", - "name": "c_boolean" - }, - { - "index":2, - "type":"tinyint", - "name": "c_tinyint" - }, - { - "index":3, - "type":"smallint", - "name": "c_smallint" - }, - { - "index":4, - "type":"int", - "name": "c_int" - }, - { - "index":5, - "type":"bigint", - "name": "c_bigint" - }, - { - "index":6, - "type":"float", - "name": "c_float" - }, - { - "index":7, - "type":"double", - "name": "c_double" - }, - { - "index":8, - "type":"decimal", - "name": "c_decimal" - }, - { - "index":9, - "type":"string", - "name": "c_string" - }, - { - "index":10, - "type":"varchar", - "name": "c_varchar" - }, - { - "index":11, - "type":"char", - "name": "c_char" - }, - { - "index":12, - "type":"timestamp", - "name": "c_timestamp" - }, - { - "index":13, - "type":"date", - "name": "c_date" - } - ], - "password": "xxxxxx", - "fieldDelimiter": ",", - "encoding": "utf-8", - "username": "root" - }, - "name":"ftpwriter" - } - } - ], - "setting":{ - "restore":{ - "maxRowNumForCheckpoint":0, - "isRestore":false, - "restoreColumnName":"", - "restoreColumnIndex":0 - }, - "errorLimit":{ - "record":100 - }, - "speed":{ - "bytes":0, - "channel":1 - } - } - } -} diff --git a/chunjun-examples/json/ftp/ftp_stream.json b/chunjun-examples/json/ftp/ftp_stream.json new file mode 100644 index 0000000000..8a15ae70c0 --- /dev/null +++ b/chunjun-examples/json/ftp/ftp_stream.json @@ -0,0 +1,74 @@ +{ + "job": { + "content": [ + { + "reader": { + "parameter": { + "isFirstLineHeader": false, + "column": [ + { + "customConverterType": "INT", + "index": 0, + "resourceName": "", + "type": "INT", + "key": 0 + }, + { + "customConverterType": "INT", + "index": 1, + "resourceName": "", + "type": "INT", + "key": 1 + } + ], + "resourceName": "", + "fieldDelimiter": ",", + "encoding": "utf-8", + "path": "/root/test.csv", + "protocol": "SFTP", + "password": "admin123", + "port": 22, + "host": "ftp-e2e-stream", + "fileType": "CSV", + "username": "root" + }, + "name": "ftpreader" + }, + "writer": { + "parameter": { + "column": [ + { + "name": "id", + "type": "id" + }, + { + "name": "id2", + "type": "int" + } + ], + "print": true + }, + "table": { + "tableName": "sinkTable" + }, + "name": "streamwriter" + } + } + ], + "setting": { + "restore": { + "maxRowNumForCheckpoint": 0, + "isRestore": false, + "restoreColumnName": "", + "restoreColumnIndex": 0 + }, + "errorLimit": { + "record": 100 + }, + "speed": { + "bytes": 0, + "channel": 1 + } + } + } +}