From 90e04fb76d75c108b03bec7eb7fcf86c96ab6d91 Mon Sep 17 00:00:00 2001 From: yuxiqian <34335406+yuxiqian@users.noreply.github.com> Date: Mon, 1 Jul 2024 18:17:04 +0800 Subject: [PATCH 1/2] [FLINK-35615][base] Add a workaround for timestamp binary en/decoding failure with precisions mismatch --- .../common/data/LocalZonedTimestampData.java | 4 +- .../flink/cdc/common/data/TimestampData.java | 7 +- .../cdc/common/data/ZonedTimestampData.java | 4 +- .../flink/FlinkPipelineComposerITCase.java | 118 ++++++++++++++++++ ...LocalZonedTimestampDataSerializerTest.java | 4 +- .../data/TimestampDataSerializerTest.java | 4 +- .../ZonedTimestampDataSerializerTest.java | 4 +- 7 files changed, 139 insertions(+), 6 deletions(-) diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/data/LocalZonedTimestampData.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/data/LocalZonedTimestampData.java index 9f4a40c9dec..0346f41023a 100644 --- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/data/LocalZonedTimestampData.java +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/data/LocalZonedTimestampData.java @@ -179,6 +179,8 @@ public static LocalZonedTimestampData fromInstant(Instant instant) { * milliseconds. */ public static boolean isCompact(int precision) { - return precision <= 3; + // We don't use compact mode to store any timestamps for now. See TimestampData#isCompact + // for more details. + return false; } } diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/data/TimestampData.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/data/TimestampData.java index aca9580f915..81c192fed5d 100644 --- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/data/TimestampData.java +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/data/TimestampData.java @@ -168,6 +168,11 @@ public static TimestampData fromTimestamp(Timestamp timestamp) { * Returns whether the timestamp data is small enough to be stored in a long of milliseconds. */ public static boolean isCompact(int precision) { - return precision <= 3; + // We don't use compact mode to store any timestamps for now since currently MySQL source + // could not correctly infer timestamp precision from Debezium records, and precision + // mismatch could cause downstream deserialization failure. + // By enforcing the non-compaction mode, we could ensure timestamp data with any precision + // could be correctly parsed. + return false; } } diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/data/ZonedTimestampData.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/data/ZonedTimestampData.java index e6c7def3384..2dab6c8b4fb 100644 --- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/data/ZonedTimestampData.java +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/data/ZonedTimestampData.java @@ -203,6 +203,8 @@ public static ZonedTimestampData of(long millisecond, int nanoOfMillisecond, Str * Returns whether the date-time part is small enough to be stored in a long of milliseconds. */ public static boolean isCompact(int precision) { - return precision <= 3; + // We don't use compact mode to store any timestamps for now. See TimestampData#isCompact + // for more details. + return false; } } diff --git a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java index 26c9c918751..3b4a1bb2134 100644 --- a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java +++ b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java @@ -18,6 +18,9 @@ package org.apache.flink.cdc.composer.flink; import org.apache.flink.cdc.common.configuration.Configuration; +import org.apache.flink.cdc.common.data.LocalZonedTimestampData; +import org.apache.flink.cdc.common.data.TimestampData; +import org.apache.flink.cdc.common.data.ZonedTimestampData; import org.apache.flink.cdc.common.data.binary.BinaryStringData; import org.apache.flink.cdc.common.event.AddColumnEvent; import org.apache.flink.cdc.common.event.CreateTableEvent; @@ -46,6 +49,7 @@ import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.test.junit5.MiniClusterExtension; +import org.assertj.core.api.Assertions; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -716,4 +720,118 @@ void testRouteWithReplaceSymbol(ValuesDataSink.SinkApi sinkApi) throws Exception "DataChangeEvent{tableId=replaced_namespace.replaced_schema.table1, before=[1, 1], after=[], op=DELETE, meta=()}", "DataChangeEvent{tableId=replaced_namespace.replaced_schema.table1, before=[2, 2], after=[2, x], op=UPDATE, meta=()}"); } + + @Test + void testTimestampWithPrecision() throws Exception { + FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster(); + + // Setup value source + Configuration sourceConfig = new Configuration(); + sourceConfig.set( + ValuesDataSourceOptions.EVENT_SET_ID, + ValuesDataSourceHelper.EventSetId.CUSTOM_SOURCE_EVENTS); + + TableId sourceTable = + TableId.tableId("default_namespace", "default_schema", "timestamp_table"); + Schema sourceTableSchema = + Schema.newBuilder() + .physicalColumn("id", DataTypes.INT()) + .physicalColumn("ts0", DataTypes.TIMESTAMP(0)) + .physicalColumn("ts1", DataTypes.TIMESTAMP(1)) + .physicalColumn("ts2", DataTypes.TIMESTAMP(2)) + .physicalColumn("ts3", DataTypes.TIMESTAMP(3)) + .physicalColumn("ts4", DataTypes.TIMESTAMP(4)) + .physicalColumn("ts5", DataTypes.TIMESTAMP(5)) + .physicalColumn("ts6", DataTypes.TIMESTAMP(6)) + .physicalColumn("ts_tz0", DataTypes.TIMESTAMP_TZ(0)) + .physicalColumn("ts_tz1", DataTypes.TIMESTAMP_TZ(1)) + .physicalColumn("ts_tz2", DataTypes.TIMESTAMP_TZ(2)) + .physicalColumn("ts_tz3", DataTypes.TIMESTAMP_TZ(3)) + .physicalColumn("ts_tz4", DataTypes.TIMESTAMP_TZ(4)) + .physicalColumn("ts_tz5", DataTypes.TIMESTAMP_TZ(5)) + .physicalColumn("ts_tz6", DataTypes.TIMESTAMP_TZ(6)) + .physicalColumn("ts_ltz0", DataTypes.TIMESTAMP_LTZ(0)) + .physicalColumn("ts_ltz1", DataTypes.TIMESTAMP_LTZ(1)) + .physicalColumn("ts_ltz2", DataTypes.TIMESTAMP_LTZ(2)) + .physicalColumn("ts_ltz3", DataTypes.TIMESTAMP_LTZ(3)) + .physicalColumn("ts_ltz4", DataTypes.TIMESTAMP_LTZ(4)) + .physicalColumn("ts_ltz5", DataTypes.TIMESTAMP_LTZ(5)) + .physicalColumn("ts_ltz6", DataTypes.TIMESTAMP_LTZ(6)) + .primaryKey("id") + .build(); + List events = new ArrayList<>(); + BinaryRecordDataGenerator sourceTableDataGenerator = + new BinaryRecordDataGenerator( + sourceTableSchema.getColumnDataTypes().toArray(new DataType[0])); + + TimestampData timestampData = TimestampData.fromMillis(17_000_000_000L, 0); + ZonedTimestampData zonedTimestampData = ZonedTimestampData.of(17_000_000_000L, 0, "UTC"); + LocalZonedTimestampData localZonedTimestampData = + LocalZonedTimestampData.fromEpochMillis(17_000_000_000L, 0); + + events.add(new CreateTableEvent(sourceTable, sourceTableSchema)); + events.add( + DataChangeEvent.insertEvent( + sourceTable, + sourceTableDataGenerator.generate( + new Object[] { + 1, + timestampData, + timestampData, + timestampData, + timestampData, + timestampData, + timestampData, + timestampData, + zonedTimestampData, + zonedTimestampData, + zonedTimestampData, + zonedTimestampData, + zonedTimestampData, + zonedTimestampData, + zonedTimestampData, + localZonedTimestampData, + localZonedTimestampData, + localZonedTimestampData, + localZonedTimestampData, + localZonedTimestampData, + localZonedTimestampData, + localZonedTimestampData + }))); + + ValuesDataSourceHelper.setSourceEvents(Collections.singletonList(events)); + + SourceDef sourceDef = + new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", sourceConfig); + + // Setup value sink + Configuration sinkConfig = new Configuration(); + sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true); + SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig); + + // Setup pipeline + Configuration pipelineConfig = new Configuration(); + pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1); + PipelineDef pipelineDef = + new PipelineDef( + sourceDef, + sinkDef, + Collections.emptyList(), + Collections.emptyList(), + pipelineConfig); + + // Execute the pipeline + PipelineExecution execution = composer.compose(pipelineDef); + execution.execute(); + Schema sinkTableSchema = ValuesDatabase.getTableSchema(sourceTable); + Assertions.assertThat(sinkTableSchema).isEqualTo(sourceTableSchema); + String[] outputEvents = outCaptor.toString().trim().split("\n"); + + Assertions.assertThat(outputEvents) + .isEqualTo( + new String[] { + "CreateTableEvent{tableId=default_namespace.default_schema.timestamp_table, schema=columns={`id` INT,`ts0` TIMESTAMP(0),`ts1` TIMESTAMP(1),`ts2` TIMESTAMP(2),`ts3` TIMESTAMP(3),`ts4` TIMESTAMP(4),`ts5` TIMESTAMP(5),`ts6` TIMESTAMP(6),`ts_tz0` TIMESTAMP(0) WITH TIME ZONE,`ts_tz1` TIMESTAMP(1) WITH TIME ZONE,`ts_tz2` TIMESTAMP(2) WITH TIME ZONE,`ts_tz3` TIMESTAMP(3) WITH TIME ZONE,`ts_tz4` TIMESTAMP(4) WITH TIME ZONE,`ts_tz5` TIMESTAMP(5) WITH TIME ZONE,`ts_tz6` TIMESTAMP(6) WITH TIME ZONE,`ts_ltz0` TIMESTAMP_LTZ(0),`ts_ltz1` TIMESTAMP_LTZ(1),`ts_ltz2` TIMESTAMP_LTZ(2),`ts_ltz3` TIMESTAMP_LTZ(3),`ts_ltz4` TIMESTAMP_LTZ(4),`ts_ltz5` TIMESTAMP_LTZ(5),`ts_ltz6` TIMESTAMP_LTZ(6)}, primaryKeys=id, options=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.timestamp_table, before=[], after=[1, 1970-07-16T18:13:20, 1970-07-16T18:13:20, 1970-07-16T18:13:20, 1970-07-16T18:13:20, 1970-07-16T18:13:20, 1970-07-16T18:13:20, 1970-07-16T18:13:20, 1970-07-16T18:13:20Z, 1970-07-16T18:13:20Z, 1970-07-16T18:13:20Z, 1970-07-16T18:13:20Z, 1970-07-16T18:13:20Z, 1970-07-16T18:13:20Z, 1970-07-16T18:13:20Z, 1970-07-16T18:13:20, 1970-07-16T18:13:20, 1970-07-16T18:13:20, 1970-07-16T18:13:20, 1970-07-16T18:13:20, 1970-07-16T18:13:20, 1970-07-16T18:13:20], op=INSERT, meta=()}" + }); + } } diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/serializer/data/LocalZonedTimestampDataSerializerTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/serializer/data/LocalZonedTimestampDataSerializerTest.java index a0f4b226608..dbe02b17859 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/serializer/data/LocalZonedTimestampDataSerializerTest.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/serializer/data/LocalZonedTimestampDataSerializerTest.java @@ -21,6 +21,8 @@ import org.apache.flink.cdc.common.data.LocalZonedTimestampData; import org.apache.flink.cdc.runtime.serializer.SerializerTestBase; +import static org.apache.flink.cdc.common.data.LocalZonedTimestampData.isCompact; + /** A test for the {@link LocalZonedTimestampDataSerializer}. */ abstract class LocalZonedTimestampDataSerializerTest extends SerializerTestBase { @@ -31,7 +33,7 @@ protected TypeSerializer createSerializer() { @Override protected int getLength() { - return (getPrecision() <= 3) ? 8 : 12; + return isCompact(getPrecision()) ? 8 : 12; } @Override diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/serializer/data/TimestampDataSerializerTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/serializer/data/TimestampDataSerializerTest.java index 7b0afe01eae..a3b61f54c8b 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/serializer/data/TimestampDataSerializerTest.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/serializer/data/TimestampDataSerializerTest.java @@ -21,6 +21,8 @@ import org.apache.flink.cdc.common.data.TimestampData; import org.apache.flink.cdc.runtime.serializer.SerializerTestBase; +import static org.apache.flink.cdc.common.data.TimestampData.isCompact; + /** A test for the {@link TimestampDataSerializer}. */ abstract class TimestampDataSerializerTest extends SerializerTestBase { @Override @@ -30,7 +32,7 @@ protected TypeSerializer createSerializer() { @Override protected int getLength() { - return (getPrecision() <= 3) ? 8 : 12; + return isCompact(getPrecision()) ? 8 : 12; } @Override diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/serializer/data/ZonedTimestampDataSerializerTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/serializer/data/ZonedTimestampDataSerializerTest.java index 1ee44be8402..02d7e068cdc 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/serializer/data/ZonedTimestampDataSerializerTest.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/serializer/data/ZonedTimestampDataSerializerTest.java @@ -24,6 +24,8 @@ import java.time.LocalDateTime; import java.time.ZoneId; +import static org.apache.flink.cdc.common.data.ZonedTimestampData.isCompact; + /** A test for the {@link ZonedTimestampDataSerializer}. */ abstract class ZonedTimestampDataSerializerTest extends SerializerTestBase { @Override @@ -33,7 +35,7 @@ protected TypeSerializer createSerializer() { @Override protected int getLength() { - return (getPrecision() <= 3) ? 8 : 12; + return isCompact(getPrecision()) ? 8 : 12; } @Override From dce9202aebe07f74179251c33b196687e6fb2b5a Mon Sep 17 00:00:00 2001 From: yuxiqian <34335406+yuxiqian@users.noreply.github.com> Date: Thu, 1 Aug 2024 15:47:04 +0800 Subject: [PATCH 2/2] Test if MySQL handles it correctly --- .../tests/MySqlTimestampE2eITCase.java | 199 ++++++++++++++++++ .../test/resources/ddl/mysql_timestamp.sql | 75 +++++++ 2 files changed, 274 insertions(+) create mode 100644 flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlTimestampE2eITCase.java create mode 100644 flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/ddl/mysql_timestamp.sql diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlTimestampE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlTimestampE2eITCase.java new file mode 100644 index 00000000000..eb7fc71bf92 --- /dev/null +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlTimestampE2eITCase.java @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.pipeline.tests; + +import org.apache.flink.cdc.common.test.utils.TestUtils; +import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer; +import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion; +import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase; +import org.apache.flink.cdc.pipeline.tests.utils.PipelineTestEnvironment; + +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.output.Slf4jLogConsumer; + +import java.nio.file.Path; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.Statement; +import java.time.Duration; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.TimeoutException; + +/** End-to-end tests for mysql cdc pipeline job for timestamp data. */ +@RunWith(Parameterized.class) +public class MySqlTimestampE2eITCase extends PipelineTestEnvironment { + private static final Logger LOG = LoggerFactory.getLogger(MySqlTimestampE2eITCase.class); + + // ------------------------------------------------------------------------------------------ + // MySQL Variables (we always use MySQL as the data source for easier verifying) + // ------------------------------------------------------------------------------------------ + protected static final String MYSQL_TEST_USER = "mysqluser"; + protected static final String MYSQL_TEST_PASSWORD = "mysqlpw"; + protected static final String MYSQL_DRIVER_CLASS = "com.mysql.cj.jdbc.Driver"; + protected static final String INTER_CONTAINER_MYSQL_ALIAS = "mysql"; + + @ClassRule + public static final MySqlContainer MYSQL = + (MySqlContainer) + new MySqlContainer( + MySqlVersion.V8_0) // v8 support both ARM and AMD architectures + .withConfigurationOverride("docker/mysql/my.cnf") + .withSetupSQL("docker/mysql/setup.sql") + .withDatabaseName("flink-test") + .withUsername("flinkuser") + .withPassword("flinkpw") + .withNetwork(NETWORK) + .withNetworkAliases(INTER_CONTAINER_MYSQL_ALIAS) + .withLogConsumer(new Slf4jLogConsumer(LOG)); + + protected final UniqueDatabase mysqlInventoryDatabase = + new UniqueDatabase(MYSQL, "mysql_timestamp", MYSQL_TEST_USER, MYSQL_TEST_PASSWORD); + + @Before + public void before() throws Exception { + super.before(); + mysqlInventoryDatabase.createAndInitialize(); + } + + @After + public void after() { + super.after(); + mysqlInventoryDatabase.dropDatabase(); + } + + @Test + public void testSyncWholeDatabase() throws Exception { + String pipelineJob = + String.format( + "source:\n" + + " type: mysql\n" + + " hostname: %s\n" + + " port: 3306\n" + + " username: %s\n" + + " password: %s\n" + + " tables: %s.\\.*\n" + + " server-id: 5400-5404\n" + + " server-time-zone: UTC\n" + + "\n" + + "sink:\n" + + " type: values\n" + + "\n" + + "pipeline:\n" + + " parallelism: 1", + INTER_CONTAINER_MYSQL_ALIAS, + MYSQL_TEST_USER, + MYSQL_TEST_PASSWORD, + mysqlInventoryDatabase.getDatabaseName()); + Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar"); + Path valuesCdcJar = TestUtils.getResource("values-cdc-pipeline-connector.jar"); + Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar"); + submitPipelineJob(pipelineJob, mysqlCdcJar, valuesCdcJar, mysqlDriverJar); + waitUntilJobRunning(Duration.ofSeconds(30)); + LOG.info("Pipeline job is running"); + waitUntilSpecificEvent( + String.format( + "CreateTableEvent{tableId=%s.timestamps, schema=columns={`id` INT NOT NULL,`ts0` TIMESTAMP_LTZ(0),`ts1` TIMESTAMP_LTZ(1),`ts2` TIMESTAMP_LTZ(2),`ts3` TIMESTAMP_LTZ(3),`ts4` TIMESTAMP_LTZ(4),`ts5` TIMESTAMP_LTZ(5),`ts6` TIMESTAMP_LTZ(6),`dt0` TIMESTAMP(0),`dt1` TIMESTAMP(1),`dt2` TIMESTAMP(2),`dt3` TIMESTAMP(3),`dt4` TIMESTAMP(4),`dt5` TIMESTAMP(5),`dt6` TIMESTAMP(6)}, primaryKeys=id, options=()}", + mysqlInventoryDatabase.getDatabaseName()), + 60000L); + + List expectedEvents = + Arrays.asList( + String.format( + "DataChangeEvent{tableId=%s.timestamps, before=[], after=[1, 2019-01-01T01:01:01, 2019-01-01T01:01:01.100, 2019-01-01T01:01:01.120, 2019-01-01T01:01:01.123, 2019-01-01T01:01:01.123400, 2019-01-01T01:01:01.123450, 2019-01-01T01:01:01.123456, 2019-01-01T01:01:01, 2019-01-01T01:01:01.100, 2019-01-01T01:01:01.120, 2019-01-01T01:01:01.123, 2019-01-01T01:01:01.123400, 2019-01-01T01:01:01.123450, 2019-01-01T01:01:01.123456], op=INSERT, meta=()}\n", + mysqlInventoryDatabase.getDatabaseName()), + String.format( + "DataChangeEvent{tableId=%s.timestamps, before=[], after=[2, 2019-01-01T01:01:01, 2019-01-01T01:01:01, 2019-01-01T01:01:01, 2019-01-01T01:01:01, 2019-01-01T01:01:01, 2019-01-01T01:01:01, 2019-01-01T01:01:01, 2019-01-01T01:01:01, 2019-01-01T01:01:01, 2019-01-01T01:01:01, 2019-01-01T01:01:01, 2019-01-01T01:01:01, 2019-01-01T01:01:01, 2019-01-01T01:01:01], op=INSERT, meta=()}", + mysqlInventoryDatabase.getDatabaseName())); + validateResult(expectedEvents); + LOG.info("Begin incremental reading stage."); + // generate binlogs + String mysqlJdbcUrl = + String.format( + "jdbc:mysql://%s:%s/%s", + MYSQL.getHost(), + MYSQL.getDatabasePort(), + mysqlInventoryDatabase.getDatabaseName()); + try (Connection conn = + DriverManager.getConnection( + mysqlJdbcUrl, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD); + Statement stat = conn.createStatement()) { + stat.execute( + "INSERT INTO timestamps\n" + + "VALUES (\n" + + " 3,\n" + + " '2019-12-31 23:59:59',\n" + + " '2019-12-31 23:59:59',\n" + + " '2019-12-31 23:59:59',\n" + + " '2019-12-31 23:59:59',\n" + + " '2019-12-31 23:59:59',\n" + + " '2019-12-31 23:59:59',\n" + + " '2019-12-31 23:59:59',\n" + + " '2019-12-31 23:59:59',\n" + + " '2019-12-31 23:59:59',\n" + + " '2019-12-31 23:59:59',\n" + + " '2019-12-31 23:59:59',\n" + + " '2019-12-31 23:59:59',\n" + + " '2019-12-31 23:59:59',\n" + + " '2019-12-31 23:59:59'\n" + + " );"); + } catch (SQLException e) { + LOG.error("Update table for CDC failed.", e); + throw e; + } + waitUntilSpecificEvent( + String.format( + "DataChangeEvent{tableId=%s.timestamps, before=[], after=[3, 2019-12-31T23:59:59, 2019-12-31T23:59:59, 2019-12-31T23:59:59, 2019-12-31T23:59:59, 2019-12-31T23:59:59, 2019-12-31T23:59:59, 2019-12-31T23:59:59, 2019-12-31T23:59:59, 2019-12-31T23:59:59, 2019-12-31T23:59:59, 2019-12-31T23:59:59, 2019-12-31T23:59:59, 2019-12-31T23:59:59, 2019-12-31T23:59:59], op=INSERT, meta=()}", + mysqlInventoryDatabase.getDatabaseName()), + 6000L); + } + + private void validateResult(List expectedEvents) throws Exception { + for (String event : expectedEvents) { + waitUntilSpecificEvent(event, 6000L); + } + } + + private void waitUntilSpecificEvent(String event, long timeout) throws Exception { + boolean result = false; + long endTimeout = System.currentTimeMillis() + timeout; + while (System.currentTimeMillis() < endTimeout) { + String stdout = taskManagerConsumer.toUtf8String(); + if (stdout.contains(event)) { + result = true; + break; + } + Thread.sleep(1000); + } + if (!result) { + throw new TimeoutException( + "failed to get specific event: " + + event + + " from stdout: " + + taskManagerConsumer.toUtf8String()); + } + } +} diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/ddl/mysql_timestamp.sql b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/ddl/mysql_timestamp.sql new file mode 100644 index 00000000000..55e3dc6bb40 --- /dev/null +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/ddl/mysql_timestamp.sql @@ -0,0 +1,75 @@ +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. + +-- ---------------------------------------------------------------------------------------------------------------- +-- DATABASE: mysql_inventory +-- ---------------------------------------------------------------------------------------------------------------- + +-- Create and populate our products using a single insert with many rows +CREATE TABLE timestamps ( + id INTEGER NOT NULL PRIMARY KEY, + ts0 TIMESTAMP(0), + ts1 TIMESTAMP(1), + ts2 TIMESTAMP(2), + ts3 TIMESTAMP(3), + ts4 TIMESTAMP(4), + ts5 TIMESTAMP(5), + ts6 TIMESTAMP(6), + dt0 DATETIME(0), + dt1 DATETIME(1), + dt2 DATETIME(2), + dt3 DATETIME(3), + dt4 DATETIME(4), + dt5 DATETIME(5), + dt6 DATETIME(6) +); + +INSERT INTO timestamps +VALUES ( + 1, + '2019-01-01 01:01:01', + '2019-01-01 01:01:01.1', + '2019-01-01 01:01:01.12', + '2019-01-01 01:01:01.123', + '2019-01-01 01:01:01.1234', + '2019-01-01 01:01:01.12345', + '2019-01-01 01:01:01.123456', + '2019-01-01 01:01:01', + '2019-01-01 01:01:01.1', + '2019-01-01 01:01:01.12', + '2019-01-01 01:01:01.123', + '2019-01-01 01:01:01.1234', + '2019-01-01 01:01:01.12345', + '2019-01-01 01:01:01.123456' +); + +INSERT INTO timestamps +VALUES ( + 2, + '2019-01-01 01:01:01', + '2019-01-01 01:01:01', + '2019-01-01 01:01:01', + '2019-01-01 01:01:01', + '2019-01-01 01:01:01', + '2019-01-01 01:01:01', + '2019-01-01 01:01:01', + '2019-01-01 01:01:01', + '2019-01-01 01:01:01', + '2019-01-01 01:01:01', + '2019-01-01 01:01:01', + '2019-01-01 01:01:01', + '2019-01-01 01:01:01', + '2019-01-01 01:01:01' + ); \ No newline at end of file