diff --git a/chunjun-e2e/pom.xml b/chunjun-e2e/pom.xml index 01b8774bcb..709b5c7b82 100644 --- a/chunjun-e2e/pom.xml +++ b/chunjun-e2e/pom.xml @@ -22,6 +22,14 @@ + + com.dtstack.chunjun + chunjun-clients + ${project.version} + test + + + org.testcontainers testcontainers @@ -30,15 +38,15 @@ - com.github.noraui - ojdbc8 - 12.2.0.1 + org.testcontainers + jdbc + ${testcontainers.version} test org.testcontainers - jdbc + oracle-xe ${testcontainers.version} test @@ -58,16 +66,16 @@ - com.dtstack.chunjun - chunjun-clients - ${project.version} + org.postgresql + postgresql + 42.2.19 test - org.postgresql - postgresql - 42.2.19 + com.github.noraui + ojdbc8 + 12.2.0.1 test diff --git a/chunjun-e2e/src/test/java/com/dtstack/chunjun/connector/containers/oracle/OracleContainer.java b/chunjun-e2e/src/test/java/com/dtstack/chunjun/connector/containers/oracle/OracleContainer.java deleted file mode 100644 index 999c88d732..0000000000 --- a/chunjun-e2e/src/test/java/com/dtstack/chunjun/connector/containers/oracle/OracleContainer.java +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.dtstack.chunjun.connector.containers.oracle; - -import org.testcontainers.containers.JdbcDatabaseContainer; -import org.testcontainers.containers.wait.strategy.WaitStrategy; -import org.testcontainers.containers.wait.strategy.WaitStrategyTarget; -import org.testcontainers.images.builder.ImageFromDockerfile; - -import java.net.URISyntaxException; -import java.net.URL; -import java.nio.file.Paths; -import java.time.Duration; - -public class OracleContainer extends JdbcDatabaseContainer { - private static final URL ORACLE_DOCKERFILE = - OracleContainer.class.getClassLoader().getResource("docker/oracle/Dockerfile"); - - private static final String ORACLE_HOST = "chunjun-e2e-oracle11"; - - private static final String ORACLE_DRIVER_CLASS = "oracle.jdbc.driver.OracleDriver"; - - private static final Integer ORACLE_PORT = 1521; - - private static final String SID = "xe"; - - private static final String USERNAME = "system"; - - private static final String PASSWORD = "oracle"; - - public OracleContainer() throws URISyntaxException { - super( - new ImageFromDockerfile(ORACLE_HOST, true) - .withDockerfile(Paths.get(ORACLE_DOCKERFILE.toURI()))); - withExposedPorts(ORACLE_PORT); - waitingFor( - new WaitStrategy() { - @Override - public void waitUntilReady(WaitStrategyTarget waitStrategyTarget) {} - - @Override - public WaitStrategy withStartupTimeout(Duration startupTimeout) { - return null; - } - }); - } - - @Override - public String getDriverClassName() { - return ORACLE_DRIVER_CLASS; - } - - @Override - public String getJdbcUrl() { - return "jdbc:oracle:thin:" - + this.getUsername() - + "/" - + this.getPassword() - + "@" - + this.getHost() - + ":" - + getMappedPort(ORACLE_PORT) - + ":" - + this.getSid(); - } - - @Override - public String getUsername() { - return USERNAME; - } - - @Override - public String getPassword() { - return PASSWORD; - } - - @Override - public String getTestQueryString() { - return "SELECT 1 FROM DUAL"; - } - - public String getSid() { - return SID; - } - - public Integer getOraclePort() { - return this.getMappedPort(1521); - } -} diff --git a/chunjun-e2e/src/test/java/com/dtstack/chunjun/connector/entity/LaunchCommandBuilder.java b/chunjun-e2e/src/test/java/com/dtstack/chunjun/connector/entity/LaunchCommandBuilder.java index f22f0ba232..7b33839822 100644 --- a/chunjun-e2e/src/test/java/com/dtstack/chunjun/connector/entity/LaunchCommandBuilder.java +++ b/chunjun-e2e/src/test/java/com/dtstack/chunjun/connector/entity/LaunchCommandBuilder.java @@ -19,13 +19,14 @@ package com.dtstack.chunjun.connector.entity; import com.dtstack.chunjun.enums.ClusterMode; -import com.dtstack.chunjun.util.GsonUtil; +import com.google.gson.GsonBuilder; import org.junit.Assert; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; public class LaunchCommandBuilder { private List commands; @@ -55,6 +56,12 @@ public LaunchCommandBuilder withChunJunDistDir(String chunJunDistDir) { return this; } + public LaunchCommandBuilder withFlinkLibDir(String chunJunDistDir) { + commands.add("-flinkLibDir"); + commands.add(chunJunDistDir); + return this; + } + public LaunchCommandBuilder withFlinkConfDir(String flinkConfDir) { commands.add("-flinkConfDir"); commands.add(flinkConfDir); @@ -63,13 +70,13 @@ public LaunchCommandBuilder withFlinkConfDir(String flinkConfDir) { public LaunchCommandBuilder withFlinkCustomConf(Map properties) { commands.add("-confProp"); - commands.add(GsonUtil.GSON.toJson(properties)); + commands.add(new GsonBuilder().create().toJson(properties)); return this; } public LaunchCommandBuilder withAddJar(List path) { commands.add("-addjar"); - commands.add(GsonUtil.GSON.toJson(path)); + commands.add(new GsonBuilder().create().toJson(path)); return this; } @@ -79,6 +86,18 @@ public LaunchCommandBuilder withShipFile(String path) { return this; } + public LaunchCommandBuilder withParameters(Map parameters) { + if (parameters != null && !parameters.isEmpty()) { + commands.add("-p"); + String params = + parameters.entrySet().stream() + .map(entry -> entry.getKey() + "=" + entry.getValue()) + .collect(Collectors.joining(",")); + commands.add(params); + } + return this; + } + public String[] builder() { return commands.toArray(new String[0]); } diff --git a/chunjun-e2e/src/test/java/com/dtstack/chunjun/connector/test/ChunjunFlinkStandaloneE2eTest.java b/chunjun-e2e/src/test/java/com/dtstack/chunjun/connector/test/ChunjunFlinkStandaloneE2eTest.java deleted file mode 100644 index a5bbc56cfe..0000000000 --- a/chunjun-e2e/src/test/java/com/dtstack/chunjun/connector/test/ChunjunFlinkStandaloneE2eTest.java +++ /dev/null @@ -1,260 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.dtstack.chunjun.connector.test; - -import com.dtstack.chunjun.client.Launcher; -import com.dtstack.chunjun.connector.containers.flink.FlinkStandaloneContainer; -import com.dtstack.chunjun.connector.entity.JobAccumulatorResult; -import com.dtstack.chunjun.connector.entity.LaunchCommandBuilder; -import com.dtstack.chunjun.enums.ClusterMode; -import com.dtstack.chunjun.util.GsonUtil; - -import org.apache.flink.api.common.JobStatus; -import org.apache.flink.api.common.time.Deadline; -import org.apache.flink.client.deployment.StandaloneClusterId; -import org.apache.flink.client.program.rest.RestClusterClient; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.RestOptions; -import org.apache.flink.runtime.client.JobStatusMessage; -import org.apache.flink.table.api.ValidationException; - -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import org.apache.commons.lang.StringUtils; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.ClassRule; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.testcontainers.containers.Network; -import org.testcontainers.containers.output.Slf4jLogConsumer; -import org.testcontainers.containers.wait.strategy.WaitStrategy; -import org.testcontainers.containers.wait.strategy.WaitStrategyTarget; -import org.testcontainers.lifecycle.Startables; - -import javax.annotation.Nullable; - -import java.io.File; -import java.net.URISyntaxException; -import java.net.URL; -import java.time.Duration; -import java.util.Collection; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.stream.Stream; - -import static org.apache.flink.util.Preconditions.checkState; - -/** - * @author jayce - * @version 1.0 - * @date 2022/8/11 11:18 - */ -public class ChunjunFlinkStandaloneE2eTest { - private static final Logger LOG = LoggerFactory.getLogger(ChunjunFlinkStandaloneE2eTest.class); - - public static final String CHUNJUN_HOME = - new File(System.getProperty("user.dir")).getParentFile().getAbsolutePath(); - - public static final URL FLINK_CONF_DIR_URL = - ChunjunFlinkStandaloneE2eTest.class - .getClassLoader() - .getResource("docker/flink/standalone"); - - public static final String CHUNJUN_DIST = - new File(System.getProperty("user.dir")).getParentFile().getAbsolutePath() - + "/chunjun-dist"; - - private static final String FLINK_STANDALONE_HOST = "standalone"; - - public static final int JOB_MANAGER_REST_PORT = 8081; - - public static final int JOB_MANAGER_RPC_PORT = 6213; - - @ClassRule public static final Network NETWORK = Network.newNetwork(); - - protected FlinkStandaloneContainer flinkStandaloneContainer; - - @Nullable private RestClusterClient restClusterClient; - - @Before - public void before() throws URISyntaxException { - Assert.assertTrue("chunjun-dist directory must exists", new File(CHUNJUN_DIST).exists()); - - LOG.info("Starting flink standalone containers..."); - - flinkStandaloneContainer = - new FlinkStandaloneContainer(FLINK_STANDALONE_HOST) - .withCommand(FLINK_STANDALONE_HOST) - .withNetwork(NETWORK) - .withNetworkAliases(FLINK_STANDALONE_HOST) - .withExposedPorts(JOB_MANAGER_REST_PORT, JOB_MANAGER_RPC_PORT) - .withFileSystemBind(CHUNJUN_DIST, CHUNJUN_DIST) - .withLogConsumer(new Slf4jLogConsumer(LOG)) - .waitingFor( - new WaitStrategy() { - @Override - public void waitUntilReady( - WaitStrategyTarget waitStrategyTarget) {} - - @Override - public WaitStrategy withStartupTimeout( - Duration startupTimeout) { - return null; - } - }); - - Startables.deepStart(Stream.of(flinkStandaloneContainer)).join(); - LOG.info("Containers are started."); - } - - @After - public void after() { - if (restClusterClient != null) { - restClusterClient.close(); - } - - if (flinkStandaloneContainer != null) { - flinkStandaloneContainer.stop(); - } - } - - public RestClusterClient getRestClusterClient() { - if (restClusterClient != null) { - return restClusterClient; - } - checkState( - flinkStandaloneContainer.isRunning(), - "Cluster client should only be retrieved for a running cluster"); - try { - final Configuration clientConfiguration = new Configuration(); - clientConfiguration.set(RestOptions.ADDRESS, flinkStandaloneContainer.getHost()); - clientConfiguration.set( - RestOptions.PORT, - flinkStandaloneContainer.getMappedPort(JOB_MANAGER_REST_PORT)); - this.restClusterClient = - new RestClusterClient<>(clientConfiguration, StandaloneClusterId.getInstance()); - } catch (Exception e) { - throw new IllegalStateException( - "Failed to create client for Flink container cluster", e); - } - return restClusterClient; - } - - protected void submitSyncJobOnStandLone(String syncConf) throws Exception { - HashMap customProperties = Maps.newHashMap(); - customProperties.put( - "jobmanager.rpc.port", - flinkStandaloneContainer.getMappedPort(JOB_MANAGER_RPC_PORT)); - customProperties.put( - "rest.port", flinkStandaloneContainer.getMappedPort(JOB_MANAGER_REST_PORT)); - - String[] syncs = - new LaunchCommandBuilder("sync") - .withFlinkConfDir(FLINK_CONF_DIR_URL.toURI().getPath()) - .withRunningMode(ClusterMode.standalone) - .withJobContentPath(syncConf) - .withChunJunDistDir(CHUNJUN_DIST) - .withFlinkCustomConf(customProperties) - .builder(); - Launcher.main(syncs); - } - - public JobAccumulatorResult waitUntilJobFinished(Duration timeout) - throws ExecutionException, InterruptedException { - RestClusterClient clusterClient = getRestClusterClient(); - Deadline deadline = Deadline.fromNow(timeout); - int i = 0; - while (deadline.hasTimeLeft()) { - Collection jobStatusMessages; - try { - jobStatusMessages = clusterClient.listJobs().get(10, TimeUnit.SECONDS); - } catch (Exception e) { - if (i++ > 10) { - throw new RuntimeException("Error when fetching job status.", e); - } - Thread.sleep(5000L); - LOG.warn("Error when fetching job status.", e); - continue; - } - - if (jobStatusMessages != null && !jobStatusMessages.isEmpty()) { - JobStatusMessage message = jobStatusMessages.iterator().next(); - JobStatus jobStatus = message.getJobState(); - if (jobStatus.isTerminalState()) { - if (message.getJobState().equals(JobStatus.FINISHED)) { - CompletableFuture> accumulators = - clusterClient.getAccumulators(message.getJobId()); - Map data = accumulators.get(); - return printResult(data); - } - throw new ValidationException( - String.format( - "Job has been terminated! JobName: %s, JobID: %s, Status: %s", - message.getJobName(), - message.getJobId(), - message.getJobState())); - } else if (jobStatus == JobStatus.FINISHED) { - CompletableFuture> accumulators = - clusterClient.getAccumulators(message.getJobId()); - Map data = accumulators.get(); - return printResult(data); - } - } - } - throw new RuntimeException("wait job finished timeout"); - } - - public JobAccumulatorResult printResult(Map result) { - List names = Lists.newArrayList(); - List values = Lists.newArrayList(); - result.forEach( - (name, val) -> { - names.add(name); - values.add(String.valueOf(val)); - }); - - int maxLength = 0; - for (String name : names) { - maxLength = Math.max(maxLength, name.length()); - } - maxLength += 5; - - StringBuilder builder = new StringBuilder(128); - builder.append("\n*********************************************\n"); - for (int i = 0; i < names.size(); i++) { - String name = names.get(i); - builder.append(name + StringUtils.repeat(" ", maxLength - name.length())); - builder.append("| ").append(values.get(i)); - - if (i + 1 < names.size()) { - builder.append("\n"); - } - } - builder.append("\n*********************************************\n"); - LOG.info(builder.toString()); - - return GsonUtil.GSON.fromJson(GsonUtil.GSON.toJson(result), JobAccumulatorResult.class); - } -} diff --git a/chunjun-e2e/src/test/java/com/dtstack/chunjun/connector/test/FlinkStandaloneStreamE2eTests.java b/chunjun-e2e/src/test/java/com/dtstack/chunjun/connector/test/FlinkStandaloneStreamE2eTests.java deleted file mode 100644 index 15814af1ec..0000000000 --- a/chunjun-e2e/src/test/java/com/dtstack/chunjun/connector/test/FlinkStandaloneStreamE2eTests.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.dtstack.chunjun.connector.test; - -import com.dtstack.chunjun.connector.entity.JobAccumulatorResult; - -import org.junit.Assert; -import org.junit.Test; - -import java.time.Duration; - -public class FlinkStandaloneStreamE2eTests extends ChunjunFlinkStandaloneE2eTest { - - @Test - public void testStream() throws Exception { - submitSyncJobOnStandLone(CHUNJUN_HOME + "/chunjun-examples/json/stream/stream.json"); - JobAccumulatorResult jobAccumulatorResult = waitUntilJobFinished(Duration.ofMinutes(30)); - - Assert.assertEquals(jobAccumulatorResult.getNumRead(), 30); - } -} diff --git a/chunjun-e2e/src/test/java/com/dtstack/chunjun/connector/test/standalone/oracle/sync/OracleSyncE2eITCase.java b/chunjun-e2e/src/test/java/com/dtstack/chunjun/connector/test/standalone/oracle/sync/OracleSyncE2eITCase.java index d1029e3cbc..d13833c704 100644 --- a/chunjun-e2e/src/test/java/com/dtstack/chunjun/connector/test/standalone/oracle/sync/OracleSyncE2eITCase.java +++ b/chunjun-e2e/src/test/java/com/dtstack/chunjun/connector/test/standalone/oracle/sync/OracleSyncE2eITCase.java @@ -18,7 +18,6 @@ package com.dtstack.chunjun.connector.test.standalone.oracle.sync; -import com.dtstack.chunjun.connector.containers.oracle.OracleContainer; import com.dtstack.chunjun.connector.entity.JobAccumulatorResult; import com.dtstack.chunjun.connector.test.utils.ChunjunFlinkStandaloneTestEnvironment; import com.dtstack.chunjun.connector.test.utils.JdbcProxy; @@ -28,6 +27,7 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.testcontainers.containers.OracleContainer; import org.testcontainers.containers.output.Slf4jLogConsumer; import org.testcontainers.lifecycle.Startables; import org.testcontainers.shaded.org.apache.commons.lang.StringUtils; @@ -49,10 +49,14 @@ public class OracleSyncE2eITCase extends ChunjunFlinkStandaloneTestEnvironment { private static final Logger LOG = LoggerFactory.getLogger(OracleSyncE2eITCase.class); - private static final URL ORACLE_INIT_SQL_URL = + public static final URL ORACLE_INIT_SQL_URL = OracleSyncE2eITCase.class.getClassLoader().getResource("docker/oracle/init.sql"); - protected static final String ORACLE_HOST = "chunjun-e2e-oracle"; + private static final String ORACLE_TEST_USER = "dbzuser"; + private static final String ORACLE_TEST_PASSWORD = "dbz"; + private static final String ORACLE_DRIVER_CLASS = "oracle.jdbc.driver.OracleDriver"; + private static final String INTER_CONTAINER_ORACLE_ALIAS = "chunjun-e2e-oracle"; + private static final String ORACLE_IMAGE = "jark/oracle-xe-11g-r2-cdc:0.1"; public OracleContainer oracle; @@ -60,12 +64,12 @@ public class OracleSyncE2eITCase extends ChunjunFlinkStandaloneTestEnvironment { public void before() throws Exception { super.before(); LOG.info("Starting containers..."); - oracle = new OracleContainer(); - oracle.withNetwork(NETWORK); - oracle.withNetworkAliases(ORACLE_HOST); - oracle.withLogConsumer(new Slf4jLogConsumer(LOG)); + oracle = + new OracleContainer(ORACLE_IMAGE) + .withNetwork(NETWORK) + .withNetworkAliases(INTER_CONTAINER_ORACLE_ALIAS) + .withLogConsumer(new Slf4jLogConsumer(LOG)); Startables.deepStart(Stream.of(oracle)).join(); - Thread.sleep(5000); initOracle(); LOG.info("Containers are started."); } @@ -91,9 +95,9 @@ public void testOracleToOracle() throws Exception { JdbcProxy proxy = new JdbcProxy( oracle.getJdbcUrl(), - oracle.getUsername(), - oracle.getPassword(), - oracle.getDriverClassName()); + ORACLE_TEST_USER, + ORACLE_TEST_PASSWORD, + ORACLE_DRIVER_CLASS); List expectResult = Arrays.asList( "1,4086.104923538155,2095-02-04 15:59:22.0,2022-08-03 14:11:12.651,FdTY,Abc,Hello", @@ -108,7 +112,7 @@ public void testOracleToOracle() throws Exception { "11,9036.620205198631,2040-03-20 13:40:13.0,2022-08-03 14:11:12.671,l4bezLJ,Abc,Hello"); proxy.checkResultWithTimeout( expectResult, - "SYSTEM.TEST_SINK", + "debezium.TEST_SINK", new String[] { "INT_VAL", "DOUBLE_VAL", @@ -121,6 +125,33 @@ public void testOracleToOracle() throws Exception { 150000L); } + @Test + public void testLogminerToOracle() throws Exception { + submitSyncJobOnStandLone( + ChunjunFlinkStandaloneTestEnvironment.CHUNJUN_HOME + + "/chunjun-examples/json/logminer/logminer_oracle_to_oracle.json"); + waitUntilJobRunning(Duration.ofSeconds(30L)); + + JdbcProxy proxy = + new JdbcProxy( + oracle.getJdbcUrl(), + ORACLE_TEST_USER, + ORACLE_TEST_PASSWORD, + ORACLE_DRIVER_CLASS); + List expectResult = + Arrays.asList( + "1000010,大海,中国人", + "1000011,大海,中国人", + "1000012,大海,中国人", + "1000013,大海,中国人", + "1000014,大海,中国人"); + proxy.checkResultWithTimeout( + expectResult, + "debezium.ORACLE_TEST_LOGMINER", + new String[] {"TEST_INT", "TEST_VARCHAR", "TEST_CHAR"}, + 150000L); + } + private void initOracle() throws IOException, SQLException { String initSqls = FileUtils.readFileToString(new File(ORACLE_INIT_SQL_URL.getPath()), "UTF-8"); @@ -140,7 +171,11 @@ private void initOracle() throws IOException, SQLException { } private Connection getOracleJdbcConnection() throws SQLException { + // we need to set this property, otherwise Azure Pipeline will complain + // "ORA-01882: timezone region not found" error when building the Oracle JDBC connection + // see https://stackoverflow.com/a/9177263/4915129 + System.setProperty("oracle.jdbc.timezoneAsRegion", "false"); return DriverManager.getConnection( - oracle.getJdbcUrl(), oracle.getUsername(), oracle.getPassword()); + oracle.getJdbcUrl(), ORACLE_TEST_USER, ORACLE_TEST_PASSWORD); } } diff --git a/chunjun-e2e/src/test/java/com/dtstack/chunjun/connector/test/standalone/postgre/sync/PostgreSyncE2eITCase.java b/chunjun-e2e/src/test/java/com/dtstack/chunjun/connector/test/standalone/postgre/sync/PostgreSyncE2eITCase.java index 0e20c1f1e1..c83b531c73 100644 --- a/chunjun-e2e/src/test/java/com/dtstack/chunjun/connector/test/standalone/postgre/sync/PostgreSyncE2eITCase.java +++ b/chunjun-e2e/src/test/java/com/dtstack/chunjun/connector/test/standalone/postgre/sync/PostgreSyncE2eITCase.java @@ -78,7 +78,7 @@ public void after() { } @Test - public void testOracleToOracle() throws Exception { + public void testPostgreToPostgre() throws Exception { submitSyncJobOnStandLone( ChunjunFlinkStandaloneTestEnvironment.CHUNJUN_HOME + "/chunjun-examples/json/postgresql/postgre_postgre.json"); diff --git a/chunjun-e2e/src/test/java/com/dtstack/chunjun/connector/test/utils/ChunjunFlinkStandaloneTestEnvironment.java b/chunjun-e2e/src/test/java/com/dtstack/chunjun/connector/test/utils/ChunjunFlinkStandaloneTestEnvironment.java index 14cf1d34f4..548165b01e 100644 --- a/chunjun-e2e/src/test/java/com/dtstack/chunjun/connector/test/utils/ChunjunFlinkStandaloneTestEnvironment.java +++ b/chunjun-e2e/src/test/java/com/dtstack/chunjun/connector/test/utils/ChunjunFlinkStandaloneTestEnvironment.java @@ -18,7 +18,6 @@ package com.dtstack.chunjun.connector.test.utils; -import com.dtstack.chunjun.client.Launcher; import com.dtstack.chunjun.connector.containers.flink.FlinkStandaloneContainer; import com.dtstack.chunjun.connector.entity.JobAccumulatorResult; import com.dtstack.chunjun.connector.entity.LaunchCommandBuilder; @@ -35,7 +34,6 @@ import org.apache.flink.table.api.ValidationException; import com.google.common.collect.Lists; -import com.google.common.collect.Maps; import org.apache.commons.lang.StringUtils; import org.junit.After; import org.junit.Assert; @@ -43,9 +41,11 @@ import org.junit.ClassRule; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.testcontainers.containers.Container; import org.testcontainers.containers.Network; import org.testcontainers.containers.output.Slf4jLogConsumer; import org.testcontainers.lifecycle.Startables; +import org.testcontainers.utility.MountableFile; import javax.annotation.Nullable; @@ -53,8 +53,6 @@ import java.net.URL; import java.time.Duration; import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; @@ -73,7 +71,8 @@ public class ChunjunFlinkStandaloneTestEnvironment { private static final Logger LOG = LoggerFactory.getLogger(ChunjunFlinkStandaloneTestEnvironment.class); - public static final String CHUNJUN_HOME = System.getProperty("user.dir") + "/.."; + public static final String CHUNJUN_HOME = + new File(System.getProperty("user.dir")).getParentFile().getAbsolutePath(); public static final URL FLINK_CONF_DIR_URL = ChunjunFlinkStandaloneTestEnvironment.class @@ -84,6 +83,16 @@ public class ChunjunFlinkStandaloneTestEnvironment { new File(System.getProperty("user.dir")).getParentFile().getAbsolutePath() + "/chunjun-dist"; + public static final String CHUNJUN_LIB = + new File(System.getProperty("user.dir")).getParentFile().getAbsolutePath() + "/lib"; + + public static final String CHUNJUN_BIN = + new File(System.getProperty("user.dir")).getParentFile().getAbsolutePath() + "/bin"; + + public static final String CHUNJUN_EXAMPLES = + new File(System.getProperty("user.dir")).getParentFile().getAbsolutePath() + + "/chunjun-examples"; + private static final String FLINK_STANDALONE_HOST = "standalone"; public static final int JOB_MANAGER_REST_PORT = 8081; @@ -108,9 +117,12 @@ public void before() throws Exception { .withNetworkAliases(FLINK_STANDALONE_HOST) .withExposedPorts(JOB_MANAGER_REST_PORT, JOB_MANAGER_RPC_PORT) .withFileSystemBind(CHUNJUN_DIST, CHUNJUN_DIST) + .withFileSystemBind(CHUNJUN_LIB, CHUNJUN_LIB) + .withFileSystemBind(CHUNJUN_EXAMPLES, CHUNJUN_EXAMPLES) .withLogConsumer(new Slf4jLogConsumer(LOG)); - Startables.deepStart(Stream.of(flinkStandaloneContainer)).join(); + flinkStandaloneContainer.copyFileToContainer( + MountableFile.forHostPath(CHUNJUN_BIN + "/submit.sh"), CHUNJUN_BIN + "/submit.sh"); LOG.info("Containers are started."); } @@ -148,23 +160,28 @@ public RestClusterClient getRestClusterClient() { } protected void submitSyncJobOnStandLone(String syncConf) throws Exception { - HashMap customProperties = Maps.newHashMap(); - customProperties.put( - "jobmanager.rpc.port", - flinkStandaloneContainer.getMappedPort(JOB_MANAGER_RPC_PORT)); - customProperties.put( - "rest.port", flinkStandaloneContainer.getMappedPort(JOB_MANAGER_REST_PORT)); + this.submitSyncJobOnStandLoneWithParameters(syncConf, null); + } + protected void submitSyncJobOnStandLoneWithParameters( + String syncConf, Map parameters) throws Exception { String[] syncs = new LaunchCommandBuilder("sync") - .withFlinkConfDir(FLINK_CONF_DIR_URL.toURI().getPath()) + .withFlinkConfDir("/opt/flink/conf") .withRunningMode(ClusterMode.standalone) .withJobContentPath(syncConf) .withChunJunDistDir(CHUNJUN_DIST) - .withFlinkCustomConf(customProperties) - .withAddJar(Collections.singletonList(CHUNJUN_DIST + "/chunjun-core.jar")) + .withFlinkLibDir(CHUNJUN_LIB) + .withParameters(parameters) .builder(); - Launcher.main(syncs); + Container.ExecResult execResult = + flinkStandaloneContainer.execInContainer( + "bash", CHUNJUN_BIN + "/submit.sh", String.join(" ", syncs)); + LOG.info(execResult.getStdout()); + LOG.error(execResult.getStderr()); + if (execResult.getExitCode() != 0) { + throw new AssertionError("Failed when submitting the SQL job."); + } } public JobAccumulatorResult waitUntilJobFinished(Duration timeout) @@ -212,6 +229,34 @@ public JobAccumulatorResult waitUntilJobFinished(Duration timeout) throw new RuntimeException("wait job finished timeout"); } + public void waitUntilJobRunning(Duration timeout) { + RestClusterClient clusterClient = getRestClusterClient(); + Deadline deadline = Deadline.fromNow(timeout); + while (deadline.hasTimeLeft()) { + Collection jobStatusMessages; + try { + jobStatusMessages = clusterClient.listJobs().get(10, TimeUnit.SECONDS); + } catch (Exception e) { + LOG.warn("Error when fetching job status.", e); + continue; + } + if (jobStatusMessages != null && !jobStatusMessages.isEmpty()) { + JobStatusMessage message = jobStatusMessages.iterator().next(); + JobStatus jobStatus = message.getJobState(); + if (jobStatus.isTerminalState()) { + throw new ValidationException( + String.format( + "Job has been terminated! JobName: %s, JobID: %s, Status: %s", + message.getJobName(), + message.getJobId(), + message.getJobState())); + } else if (jobStatus == JobStatus.RUNNING) { + return; + } + } + } + } + public JobAccumulatorResult printResult(Map result) { List names = Lists.newArrayList(); List values = Lists.newArrayList(); diff --git a/chunjun-e2e/src/test/resources/docker/oracle/Dockerfile b/chunjun-e2e/src/test/resources/docker/oracle/Dockerfile deleted file mode 100644 index 61c4af90d5..0000000000 --- a/chunjun-e2e/src/test/resources/docker/oracle/Dockerfile +++ /dev/null @@ -1,17 +0,0 @@ -FROM rohitbasu77/oracle11g:latest - -#hostname: localhost or docker machine ip -#port: 1521 -#sid: xe -#username: system -#password: oracle -#Password for SYS & SYSTEM is oracle -#Password for fareuser, searchuser, bookinguser, checkinuser is rohit123 - -LABEL maintainer="www.dtstack.com" - -ENV LANG=C.UTF-8 -ENV TZ=Asia/Shanghai -EXPOSE 1521 -EXPOSE 22 - diff --git a/chunjun-e2e/src/test/resources/docker/oracle/init.sql b/chunjun-e2e/src/test/resources/docker/oracle/init.sql index 104e70b2f2..890fa10ad7 100644 --- a/chunjun-e2e/src/test/resources/docker/oracle/init.sql +++ b/chunjun-e2e/src/test/resources/docker/oracle/init.sql @@ -1,4 +1,4 @@ -create table SYSTEM.ALL_TYPE1_SOURCE +create table debezium.ALL_TYPE1_SOURCE ( VARCHAR2_TYPE VARCHAR2(255), NVARCHAR2_TYPE NVARCHAR2(255), @@ -26,7 +26,7 @@ create table SYSTEM.ALL_TYPE1_SOURCE BLOB_TYPE BLOB ); -create table SYSTEM.ALL_TYPE1_SINK +create table debezium.ALL_TYPE1_SINK ( VARCHAR2_TYPE VARCHAR2(255), NVARCHAR2_TYPE NVARCHAR2(255), @@ -54,7 +54,7 @@ create table SYSTEM.ALL_TYPE1_SINK BLOB_TYPE BLOB ); -create table SYSTEM.ALL_TYPE2_SOURCE +create table debezium.ALL_TYPE2_SOURCE ( VARCHAR2_TYPE VARCHAR2(255), NVARCHAR2_TYPE NVARCHAR2(255), @@ -82,7 +82,7 @@ create table SYSTEM.ALL_TYPE2_SOURCE BLOB_TYPE BLOB ); -create table SYSTEM.ALL_TYPE2_SINK +create table debezium.ALL_TYPE2_SINK ( VARCHAR2_TYPE VARCHAR2(255), NVARCHAR2_TYPE NVARCHAR2(255), @@ -110,7 +110,7 @@ create table SYSTEM.ALL_TYPE2_SINK BLOB_TYPE BLOB ); -create table SYSTEM.TEST_SOURCE +create table debezium.TEST_SOURCE ( INT_VAL NUMBER, DOUBLE_VAL FLOAT, @@ -121,7 +121,7 @@ create table SYSTEM.TEST_SOURCE MESSAGE VARCHAR2(255) ); -create table SYSTEM.TEST_SINK +create table debezium.TEST_SINK ( INT_VAL NUMBER, DOUBLE_VAL FLOAT, @@ -132,33 +132,53 @@ create table SYSTEM.TEST_SINK MESSAGE VARCHAR2(255) ); -INSERT INTO SYSTEM.TEST_SOURCE (INT_VAL, DOUBLE_VAL, DATE_VAL, TIMESTAMP_VAL, VAR_VAL, NAME, MESSAGE) +INSERT INTO debezium.TEST_SOURCE (INT_VAL, DOUBLE_VAL, DATE_VAL, TIMESTAMP_VAL, VAR_VAL, NAME, MESSAGE) VALUES (1, 4086.104923538155, TO_DATE('2095-02-04 15:59:22', 'YYYY-MM-DD HH24:MI:SS'), TO_TIMESTAMP('2022-08-03 14:11:12.651000', 'YYYY-MM-DD HH24:MI:SS.FF6'), 'FdTY', 'Abc', 'Hello'); -INSERT INTO SYSTEM.TEST_SOURCE (INT_VAL, DOUBLE_VAL, DATE_VAL, TIMESTAMP_VAL, VAR_VAL, NAME, MESSAGE) +INSERT INTO debezium.TEST_SOURCE (INT_VAL, DOUBLE_VAL, DATE_VAL, TIMESTAMP_VAL, VAR_VAL, NAME, MESSAGE) VALUES (2, 9401.154078754176, TO_DATE('1984-10-27 23:04:04', 'YYYY-MM-DD HH24:MI:SS'), TO_TIMESTAMP('2022-08-03 14:11:12.665000', 'YYYY-MM-DD HH24:MI:SS.FF6'), 'kPDM', 'Abc', 'Hello'); -INSERT INTO SYSTEM.TEST_SOURCE (INT_VAL, DOUBLE_VAL, DATE_VAL, TIMESTAMP_VAL, VAR_VAL, NAME, MESSAGE) +INSERT INTO debezium.TEST_SOURCE (INT_VAL, DOUBLE_VAL, DATE_VAL, TIMESTAMP_VAL, VAR_VAL, NAME, MESSAGE) VALUES (3, 3654.8354065891676, TO_DATE('2082-11-01 05:25:45', 'YYYY-MM-DD HH24:MI:SS'), TO_TIMESTAMP('2022-08-03 14:11:12.665000', 'YYYY-MM-DD HH24:MI:SS.FF6'), 'fwhi7A', 'Abc', 'Hello'); -INSERT INTO SYSTEM.TEST_SOURCE (INT_VAL, DOUBLE_VAL, DATE_VAL, TIMESTAMP_VAL, VAR_VAL, NAME, MESSAGE) +INSERT INTO debezium.TEST_SOURCE (INT_VAL, DOUBLE_VAL, DATE_VAL, TIMESTAMP_VAL, VAR_VAL, NAME, MESSAGE) VALUES (4, 1700.5049489644764, TO_DATE('2060-02-01 03:18:48', 'YYYY-MM-DD HH24:MI:SS'), TO_TIMESTAMP('2022-08-03 14:11:12.666000', 'YYYY-MM-DD HH24:MI:SS.FF6'), 'Vam', 'Abc', 'Hello'); -INSERT INTO SYSTEM.TEST_SOURCE (INT_VAL, DOUBLE_VAL, DATE_VAL, TIMESTAMP_VAL, VAR_VAL, NAME, MESSAGE) +INSERT INTO debezium.TEST_SOURCE (INT_VAL, DOUBLE_VAL, DATE_VAL, TIMESTAMP_VAL, VAR_VAL, NAME, MESSAGE) VALUES (5, 7213.916066384409, TO_DATE('2027-11-14 21:55:03', 'YYYY-MM-DD HH24:MI:SS'), TO_TIMESTAMP('2022-08-03 14:11:12.666000', 'YYYY-MM-DD HH24:MI:SS.FF6'), 'X2QZAo', 'Abc', 'Hello'); -INSERT INTO SYSTEM.TEST_SOURCE (INT_VAL, DOUBLE_VAL, DATE_VAL, TIMESTAMP_VAL, VAR_VAL, NAME, MESSAGE) +INSERT INTO debezium.TEST_SOURCE (INT_VAL, DOUBLE_VAL, DATE_VAL, TIMESTAMP_VAL, VAR_VAL, NAME, MESSAGE) VALUES (7, 7494.472210715716, TO_DATE('2096-02-08 06:28:10', 'YYYY-MM-DD HH24:MI:SS'), TO_TIMESTAMP('2022-08-03 14:11:12.668000', 'YYYY-MM-DD HH24:MI:SS.FF6'), 'zW6QXgrz', 'Abc', 'Hello'); -INSERT INTO SYSTEM.TEST_SOURCE (INT_VAL, DOUBLE_VAL, DATE_VAL, TIMESTAMP_VAL, VAR_VAL, NAME, MESSAGE) +INSERT INTO debezium.TEST_SOURCE (INT_VAL, DOUBLE_VAL, DATE_VAL, TIMESTAMP_VAL, VAR_VAL, NAME, MESSAGE) VALUES (8, 4082.4893142314077, TO_DATE('2064-02-09 08:22:15', 'YYYY-MM-DD HH24:MI:SS'), TO_TIMESTAMP('2022-08-03 14:11:12.668000', 'YYYY-MM-DD HH24:MI:SS.FF6'), 'bLLICJ4', 'Abc', 'Hello'); -INSERT INTO SYSTEM.TEST_SOURCE (INT_VAL, DOUBLE_VAL, DATE_VAL, TIMESTAMP_VAL, VAR_VAL, NAME, MESSAGE) +INSERT INTO debezium.TEST_SOURCE (INT_VAL, DOUBLE_VAL, DATE_VAL, TIMESTAMP_VAL, VAR_VAL, NAME, MESSAGE) VALUES (9, 2248.440916449925, TO_DATE('2089-10-14 08:56:57', 'YYYY-MM-DD HH24:MI:SS'), TO_TIMESTAMP('2022-08-03 14:11:12.669000', 'YYYY-MM-DD HH24:MI:SS.FF6'), 'OYB4jD8s', 'Abc', 'Hello'); -INSERT INTO SYSTEM.TEST_SOURCE (INT_VAL, DOUBLE_VAL, DATE_VAL, TIMESTAMP_VAL, VAR_VAL, NAME, MESSAGE) +INSERT INTO debezium.TEST_SOURCE (INT_VAL, DOUBLE_VAL, DATE_VAL, TIMESTAMP_VAL, VAR_VAL, NAME, MESSAGE) VALUES (10, 1363.0987942903073, TO_DATE('1991-11-11 00:46:38', 'YYYY-MM-DD HH24:MI:SS'), TO_TIMESTAMP('2022-08-03 14:11:12.670000', 'YYYY-MM-DD HH24:MI:SS.FF6'), 'NqDOi', 'Abc', 'Hello'); -INSERT INTO SYSTEM.TEST_SOURCE (INT_VAL, DOUBLE_VAL, DATE_VAL, TIMESTAMP_VAL, VAR_VAL, NAME, MESSAGE) +INSERT INTO debezium.TEST_SOURCE (INT_VAL, DOUBLE_VAL, DATE_VAL, TIMESTAMP_VAL, VAR_VAL, NAME, MESSAGE) VALUES (11, 9036.620205198631, TO_DATE('2040-03-20 13:40:13', 'YYYY-MM-DD HH24:MI:SS'), TO_TIMESTAMP('2022-08-03 14:11:12.671000', 'YYYY-MM-DD HH24:MI:SS.FF6'), 'l4bezLJ', 'Abc', 'Hello'); + +CREATE TABLE debezium.ORACLE_TEST +( + TEST_INT NUMBER NOT NULL, + TEST_VARCHAR NVARCHAR2(200), + TEST_CHAR NVARCHAR2(200) +); + +CREATE TABLE debezium.ORACLE_TEST_LOGMINER +( + TEST_INT NUMBER NOT NULL, + TEST_VARCHAR NVARCHAR2(200), + TEST_CHAR NVARCHAR2(200) +); + +INSERT INTO debezium.ORACLE_TEST(TEST_INT,TEST_VARCHAR,TEST_CHAR) values(1000010,'大海','中国人'); +INSERT INTO debezium.ORACLE_TEST(TEST_INT,TEST_VARCHAR,TEST_CHAR) values(1000011,'大海','中国人'); +INSERT INTO debezium.ORACLE_TEST(TEST_INT,TEST_VARCHAR,TEST_CHAR) values(1000012,'大海','中国人'); +INSERT INTO debezium.ORACLE_TEST(TEST_INT,TEST_VARCHAR,TEST_CHAR) values(1000013,'大海','中国人'); +INSERT INTO debezium.ORACLE_TEST(TEST_INT,TEST_VARCHAR,TEST_CHAR) values(1000014,'大海','中国人'); diff --git a/chunjun-examples/json/logminer/logminer_oracle_to_oracle.json b/chunjun-examples/json/logminer/logminer_oracle_to_oracle.json new file mode 100644 index 0000000000..37db43ee94 --- /dev/null +++ b/chunjun-examples/json/logminer/logminer_oracle_to_oracle.json @@ -0,0 +1,85 @@ +{ + "job": { + "content": [ + { + "nameMapping": { + "schemaMappings": { + "DEBEZIUM": "DEBEZIUM" + }, + "tableMappings": { + "DEBEZIUM": { + "ORACLE_TEST": "ORACLE_TEST_LOGMINER" + } + }, + "fieldMappings": { + "DEBEZIUM": { + "ORACLE_TEST": { + "TEST_INT": "TEST_INT", + "TEST_VARCHAR": "TEST_VARCHAR", + "TEST_CHAR": "TEST_CHAR" + } + } + } + }, + "reader": { + "parameter": { + "jdbcUrl": "jdbc:oracle:thin:@chunjun-e2e-oracle:1521:xe", + "username": "dbzuser", + "password": "dbz", + "supportAutoAddLog": false, + "table": [ + "DEBEZIUM.ORACLE_TEST" + ], + "pavingData": false, + "split": true, + "cat": "INSERT,UPDATE,DELETE", + "readPosition": "ALL", + "queryTimeout": 3000 + }, + "name": "oraclelogminerreader" + }, + "writer": { + "parameter": { + "writeMode": "insert", + "uniqueKey": [ + ], + "allReplace": true, + "username": "dbzuser", + "password": "dbz", + "connection": [ + { + "schema": "DEBEZIUM", + "jdbcUrl": "jdbc:oracle:thin:@chunjun-e2e-oracle:1521:xe", + "table": [ + "*" + ] + } + ] + }, + "name": "oraclewriter" + } + } + ], + "setting": { + "speed": { + "bytes": 0, + "channel": 1 + }, + "errorLimit": { + "record": 1 + }, + "restore": { + "maxRowNumForCheckpoint": 0, + "isRestore": false, + "restoreColumnName": "", + "restoreColumnIndex": 0 + }, + "log": { + "isLogger": true, + "level": "debug", + "path": "", + "pattern": "" + } + } + } +} diff --git a/chunjun-examples/json/oracle/oracle_oracle.json b/chunjun-examples/json/oracle/oracle_oracle.json index 12d03c0b6e..a563687cbf 100644 --- a/chunjun-examples/json/oracle/oracle_oracle.json +++ b/chunjun-examples/json/oracle/oracle_oracle.json @@ -4,11 +4,11 @@ { "reader": { "parameter": { - "username": "system", - "password": "oracle", + "username": "dbzuser", + "password": "dbz", "connection": [{ "jdbcUrl": ["jdbc:oracle:thin:@chunjun-e2e-oracle:1521:xe"], - "table": ["SYSTEM.TEST_SOURCE"] + "table": ["DEBEZIUM.TEST_SOURCE"] }], "column": [{ "name": "INT_VAL", @@ -41,12 +41,12 @@ "mode": "insert", "updateKey": [], "allReplace": true, - "username": "system", - "password": "oracle", + "username": "dbzuser", + "password": "dbz", "connection": [ { "jdbcUrl": "jdbc:oracle:thin:@chunjun-e2e-oracle:1521:xe", - "table": ["SYSTEM.TEST_SINK"] + "table": ["DEBEZIUM.TEST_SINK"] } ], "column": [{ diff --git a/pom.xml b/pom.xml index 194d4f5547..6f5530f0ed 100644 --- a/pom.xml +++ b/pom.xml @@ -21,7 +21,6 @@ chunjun-restore chunjun-sql chunjun-assembly - chunjun-e2e @@ -401,4 +400,13 @@ + + + + e2e + + chunjun-e2e + + +