diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/FlushEvent.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/FlushEvent.java
index 468faa8b5b6..798552e0499 100644
--- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/FlushEvent.java
+++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/FlushEvent.java
@@ -28,14 +28,25 @@ public class FlushEvent implements Event {
/** The schema changes from which table. */
private final TableId tableId;
- public FlushEvent(TableId tableId) {
+ /**
+ * Nonce code to distinguish flush events corresponding to each schema change event from
+ * different subTasks.
+ */
+ private final long nonce;
+
+ public FlushEvent(TableId tableId, long nonce) {
this.tableId = tableId;
+ this.nonce = nonce;
}
public TableId getTableId() {
return tableId;
}
+ public long getNonce() {
+ return nonce;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -45,11 +56,16 @@ public boolean equals(Object o) {
return false;
}
FlushEvent that = (FlushEvent) o;
- return Objects.equals(tableId, that.tableId);
+ return Objects.equals(tableId, that.tableId) && Objects.equals(nonce, that.nonce);
}
@Override
public int hashCode() {
- return Objects.hash(tableId);
+ return Objects.hash(tableId, nonce);
+ }
+
+ @Override
+ public String toString() {
+ return "FlushEvent{" + "tableId=" + tableId + ", nonce=" + nonce + '}';
}
}
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/pom.xml b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/pom.xml
index 944175e52a0..155c8240dc5 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/pom.xml
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/pom.xml
@@ -46,6 +46,20 @@ limitations under the License.
test-jar
+
+ org.apache.flink
+ flink-cdc-composer
+ ${project.version}
+ test
+
+
+
+ org.apache.flink
+ flink-cdc-pipeline-connector-values
+ ${project.version}
+ test
+
+
org.apache.flink
flink-connector-test-util
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlParallelizedPipelineITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlParallelizedPipelineITCase.java
new file mode 100644
index 00000000000..4bd9806a580
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlParallelizedPipelineITCase.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.mysql.source;
+
+import org.apache.flink.cdc.common.configuration.Configuration;
+import org.apache.flink.cdc.common.pipeline.PipelineOptions;
+import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
+import org.apache.flink.cdc.composer.PipelineExecution;
+import org.apache.flink.cdc.composer.definition.PipelineDef;
+import org.apache.flink.cdc.composer.definition.SinkDef;
+import org.apache.flink.cdc.composer.definition.SourceDef;
+import org.apache.flink.cdc.composer.flink.FlinkPipelineComposer;
+import org.apache.flink.cdc.connectors.mysql.factory.MySqlDataSourceFactory;
+import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
+import org.apache.flink.cdc.connectors.values.ValuesDatabase;
+import org.apache.flink.cdc.connectors.values.factory.ValuesDataFactory;
+import org.apache.flink.cdc.connectors.values.sink.ValuesDataSinkOptions;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+import static org.apache.flink.cdc.connectors.mysql.testutils.MySqSourceTestUtils.TEST_PASSWORD;
+import static org.apache.flink.cdc.connectors.mysql.testutils.MySqSourceTestUtils.TEST_USER;
+import static org.apache.flink.cdc.connectors.mysql.testutils.MySqSourceTestUtils.getServerId;
+import static org.apache.flink.cdc.connectors.mysql.testutils.MySqSourceTestUtils.loopCheck;
+import static org.apache.flink.configuration.CoreOptions.ALWAYS_PARENT_FIRST_LOADER_PATTERNS_ADDITIONAL;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Parallelized Integration test for MySQL connector. */
+public class MySqlParallelizedPipelineITCase extends MySqlSourceTestBase {
+
+ private static final int PARALLELISM = 4;
+ private static final int TEST_TABLE_NUMBER = 100;
+
+ // Always use parent-first classloader for CDC classes.
+ // The reason is that ValuesDatabase uses static field for holding data, we need to make sure
+ // the class is loaded by AppClassloader so that we can verify data in the test case.
+ private static final org.apache.flink.configuration.Configuration MINI_CLUSTER_CONFIG =
+ new org.apache.flink.configuration.Configuration();
+
+ static {
+ MINI_CLUSTER_CONFIG.set(
+ ALWAYS_PARENT_FIRST_LOADER_PATTERNS_ADDITIONAL,
+ Collections.singletonList("org.apache.flink.cdc"));
+ }
+
+ private final PrintStream standardOut = System.out;
+ private final ByteArrayOutputStream outCaptor = new ByteArrayOutputStream();
+
+ private final UniqueDatabase parallelismDatabase =
+ new UniqueDatabase(
+ MYSQL_CONTAINER, "extreme_parallelism_test_database", TEST_USER, TEST_PASSWORD);
+
+ @Before
+ public void init() {
+ // Take over STDOUT as we need to check the output of values sink
+ System.setOut(new PrintStream(outCaptor));
+ // Initialize in-memory database
+ ValuesDatabase.clear();
+ }
+
+ @After
+ public void cleanup() {
+ System.setOut(standardOut);
+ }
+
+ @Test
+ public void testExtremeParallelizedSchemaChange() throws Exception {
+ final String databaseName = parallelismDatabase.getDatabaseName();
+ try (Connection conn =
+ DriverManager.getConnection(
+ MYSQL_CONTAINER.getJdbcUrl(), TEST_USER, TEST_PASSWORD);
+ Statement stat = conn.createStatement()) {
+ stat.execute(String.format("CREATE DATABASE %s;", databaseName));
+ stat.execute(String.format("USE %s;", databaseName));
+ for (int i = 1; i <= TEST_TABLE_NUMBER; i++) {
+ stat.execute(String.format("DROP TABLE IF EXISTS TABLE%d;", i));
+ stat.execute(
+ String.format(
+ "CREATE TABLE TABLE%d (ID INT NOT NULL PRIMARY KEY,VERSION VARCHAR(17));",
+ i));
+ stat.execute(String.format("INSERT INTO TABLE%d VALUES (%d, 'No.%d');", i, i, i));
+ }
+ } catch (SQLException e) {
+ LOG.error("Initialize table failed.", e);
+ throw e;
+ }
+ LOG.info("Table initialized successfully.");
+
+ FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();
+
+ // Setup MySQL source
+ Configuration sourceConfig = new Configuration();
+ sourceConfig.set(MySqlDataSourceOptions.HOSTNAME, MYSQL_CONTAINER.getHost());
+ sourceConfig.set(MySqlDataSourceOptions.PORT, MYSQL_CONTAINER.getDatabasePort());
+ sourceConfig.set(MySqlDataSourceOptions.USERNAME, TEST_USER);
+ sourceConfig.set(MySqlDataSourceOptions.PASSWORD, TEST_PASSWORD);
+ sourceConfig.set(MySqlDataSourceOptions.SERVER_TIME_ZONE, "UTC");
+ sourceConfig.set(MySqlDataSourceOptions.TABLES, "\\.*.\\.*");
+ sourceConfig.set(MySqlDataSourceOptions.SERVER_ID, getServerId(PARALLELISM));
+
+ SourceDef sourceDef =
+ new SourceDef(MySqlDataSourceFactory.IDENTIFIER, "MySQL Source", sourceConfig);
+
+ // Setup value sink
+ Configuration sinkConfig = new Configuration();
+ sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true);
+ SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig);
+
+ // Setup pipeline
+ Configuration pipelineConfig = new Configuration();
+ pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, PARALLELISM);
+ pipelineConfig.set(
+ PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
+ PipelineDef pipelineDef =
+ new PipelineDef(
+ sourceDef,
+ sinkDef,
+ Collections.emptyList(),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ pipelineConfig);
+
+ // Execute the pipeline
+ PipelineExecution execution = composer.compose(pipelineDef);
+ Thread executeThread =
+ new Thread(
+ () -> {
+ try {
+ execution.execute();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ executeThread.start();
+
+ try {
+ loopCheck(
+ () ->
+ outCaptor.toString().trim().split("\n").length
+ >= TEST_TABLE_NUMBER * (PARALLELISM + 1),
+ "collect enough rows",
+ Duration.ofSeconds(120),
+ Duration.ofSeconds(1));
+ } finally {
+ executeThread.interrupt();
+ }
+
+ // Check the order and content of all received events
+ String outputEvents = outCaptor.toString();
+ assertThat(outputEvents)
+ .contains(
+ IntStream.rangeClosed(1, TEST_TABLE_NUMBER)
+ .boxed()
+ .flatMap(
+ i ->
+ Stream.concat(
+ IntStream.range(0, PARALLELISM)
+ .boxed()
+ .map(
+ subTaskId ->
+ String.format(
+ "%d> CreateTableEvent{tableId=%s.TABLE%d, schema=columns={`ID` INT NOT NULL,`VERSION` VARCHAR(17)}, primaryKeys=ID, options=()}",
+ subTaskId,
+ parallelismDatabase
+ .getDatabaseName(),
+ i)),
+ Stream.of(
+ String.format(
+ "> DataChangeEvent{tableId=%s.TABLE%d, before=[], after=[%d, No.%d], op=INSERT, meta=()}",
+ parallelismDatabase
+ .getDatabaseName(),
+ i,
+ i,
+ i))))
+ .collect(Collectors.toList()));
+ }
+}
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/testutils/MySqSourceTestUtils.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/testutils/MySqSourceTestUtils.java
index a76838d664c..d193341eacc 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/testutils/MySqSourceTestUtils.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/testutils/MySqSourceTestUtils.java
@@ -20,10 +20,13 @@
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.cdc.common.event.CreateTableEvent;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Supplier;
/** Test utilities for MySQL event source. */
public class MySqSourceTestUtils {
@@ -63,5 +66,19 @@ public static String getServerId(int parallelism) {
return serverId + "-" + (serverId + parallelism);
}
+ public static void loopCheck(
+ Supplier runnable, String description, Duration timeout, Duration interval)
+ throws Exception {
+ long deadline = System.currentTimeMillis() + timeout.toMillis();
+ while (System.currentTimeMillis() < deadline) {
+ if (runnable.get()) {
+ return;
+ }
+ Thread.sleep(interval.toMillis());
+ }
+ throw new TimeoutException(
+ "Ran out of time when waiting for " + description + " to success.");
+ }
+
private MySqSourceTestUtils() {}
}
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketAssignOperator.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketAssignOperator.java
index 358d7c36ce3..3a9801fcdea 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketAssignOperator.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketAssignOperator.java
@@ -124,7 +124,9 @@ public void processElement(StreamRecord streamRecord) throws Exception {
output.collect(
new StreamRecord<>(
new BucketWrapperFlushEvent(
- currentTaskNumber, ((FlushEvent) event).getTableId())));
+ currentTaskNumber,
+ ((FlushEvent) event).getTableId(),
+ ((FlushEvent) event).getNonce())));
return;
}
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketWrapperEventSerializer.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketWrapperEventSerializer.java
index f37d9952f98..57803643b55 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketWrapperEventSerializer.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketWrapperEventSerializer.java
@@ -82,6 +82,7 @@ public void serialize(Event event, DataOutputView dataOutputView) throws IOExcep
BucketWrapperFlushEvent bucketWrapperFlushEvent = (BucketWrapperFlushEvent) event;
dataOutputView.writeInt(bucketWrapperFlushEvent.getBucket());
tableIdSerializer.serialize(bucketWrapperFlushEvent.getTableId(), dataOutputView);
+ dataOutputView.writeLong(bucketWrapperFlushEvent.getNonce());
}
}
@@ -90,7 +91,7 @@ public Event deserialize(DataInputView source) throws IOException {
EventClass eventClass = enumSerializer.deserialize(source);
if (eventClass.equals(EventClass.BUCKET_WRAPPER_FLUSH_EVENT)) {
return new BucketWrapperFlushEvent(
- source.readInt(), tableIdSerializer.deserialize(source));
+ source.readInt(), tableIdSerializer.deserialize(source), source.readLong());
} else {
return new BucketWrapperChangeEvent(
source.readInt(), (ChangeEvent) eventSerializer.deserialize(source));
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketWrapperFlushEvent.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketWrapperFlushEvent.java
index 046a22a318f..5de479e5fa2 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketWrapperFlushEvent.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketWrapperFlushEvent.java
@@ -27,8 +27,8 @@ public class BucketWrapperFlushEvent extends FlushEvent implements BucketWrapper
private final int bucket;
- public BucketWrapperFlushEvent(int bucket, TableId tableId) {
- super(tableId);
+ public BucketWrapperFlushEvent(int bucket, TableId tableId, long nonce) {
+ super(tableId, nonce);
this.bucket = bucket;
}
diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/RouteE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/RouteE2eITCase.java
index c8072cf7d3e..c9b26c7818b 100644
--- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/RouteE2eITCase.java
+++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/RouteE2eITCase.java
@@ -24,7 +24,6 @@
import org.apache.flink.cdc.pipeline.tests.utils.PipelineTestEnvironment;
import org.junit.After;
-import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
@@ -41,6 +40,7 @@
import java.sql.Statement;
import java.time.Duration;
import java.util.concurrent.TimeoutException;
+import java.util.stream.IntStream;
/** E2e tests for routing features. */
@RunWith(Parameterized.class)
@@ -55,6 +55,7 @@ public class RouteE2eITCase extends PipelineTestEnvironment {
protected static final String MYSQL_DRIVER_CLASS = "com.mysql.cj.jdbc.Driver";
protected static final String INTER_CONTAINER_MYSQL_ALIAS = "mysql";
protected static final long EVENT_DEFAULT_TIMEOUT = 60000L;
+ protected static final int TEST_TABLE_NUMBER = 100;
@ClassRule
public static final MySqlContainer MYSQL =
@@ -73,6 +74,9 @@ public class RouteE2eITCase extends PipelineTestEnvironment {
protected final UniqueDatabase routeTestDatabase =
new UniqueDatabase(MYSQL, "route_test", MYSQL_TEST_USER, MYSQL_TEST_PASSWORD);
+ protected final UniqueDatabase extremeRouteTestDatabase =
+ new UniqueDatabase(MYSQL, "extreme_route_test", MYSQL_TEST_USER, MYSQL_TEST_PASSWORD);
+
@Before
public void before() throws Exception {
super.before();
@@ -814,15 +818,98 @@ public void testReplacementSymbol() throws Exception {
"DataChangeEvent{tableId=NEW_%s.NEW_TABLEDELTA, before=[], after=[10004], op=INSERT, meta=()}");
}
+ @Test
+ public void testExtremeMergeTableRoute() throws Exception {
+ final String databaseName = extremeRouteTestDatabase.getDatabaseName();
+ try (Connection conn =
+ DriverManager.getConnection(
+ MYSQL.getJdbcUrl(), MYSQL_TEST_USER, MYSQL_TEST_PASSWORD);
+ Statement stat = conn.createStatement()) {
+ stat.execute(String.format("CREATE DATABASE %s;", databaseName));
+ stat.execute(String.format("USE %s;", databaseName));
+ for (int i = 1; i <= TEST_TABLE_NUMBER; i++) {
+ stat.execute(String.format("DROP TABLE IF EXISTS TABLE%d;", i));
+ stat.execute(
+ String.format(
+ "CREATE TABLE TABLE%d (ID INT NOT NULL PRIMARY KEY,VERSION VARCHAR(17));",
+ i));
+ stat.execute(String.format("INSERT INTO TABLE%d VALUES (%d, 'No.%d');", i, i, i));
+ }
+ } catch (SQLException e) {
+ LOG.error("Initialize table failed.", e);
+ throw e;
+ }
+ LOG.info("Table initialized successfully.");
+
+ String pipelineJob =
+ String.format(
+ "source:\n"
+ + " type: mysql\n"
+ + " hostname: %s\n"
+ + " port: 3306\n"
+ + " username: %s\n"
+ + " password: %s\n"
+ + " tables: %s.\\.*\n"
+ + " server-id: 5400-5404\n"
+ + " server-time-zone: UTC\n"
+ + "\n"
+ + "sink:\n"
+ + " type: values\n"
+ + "\n"
+ + "pipeline:\n"
+ + " parallelism: %d",
+ INTER_CONTAINER_MYSQL_ALIAS,
+ MYSQL_TEST_USER,
+ MYSQL_TEST_PASSWORD,
+ databaseName,
+ parallelism);
+ Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
+ Path valuesCdcJar = TestUtils.getResource("values-cdc-pipeline-connector.jar");
+ Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
+ submitPipelineJob(pipelineJob, mysqlCdcJar, valuesCdcJar, mysqlDriverJar);
+ waitUntilJobRunning(Duration.ofSeconds(30));
+
+ LOG.info("Verifying CreateTableEvents...");
+ validateResult(
+ 180_000L,
+ IntStream.rangeClosed(1, TEST_TABLE_NUMBER)
+ .mapToObj(
+ i ->
+ String.format(
+ "> CreateTableEvent{tableId=%s.TABLE%d, schema=columns={`ID` INT NOT NULL,`VERSION` VARCHAR(17)}, primaryKeys=ID, options=()}",
+ databaseName, i))
+ .toArray(String[]::new));
+
+ LOG.info("Verifying DataChangeEvents...");
+ validateResult(
+ 180_000L,
+ IntStream.rangeClosed(1, TEST_TABLE_NUMBER)
+ .mapToObj(
+ i ->
+ String.format(
+ "> DataChangeEvent{tableId=%s.TABLE%d, before=[], after=[%d, No.%d], op=INSERT, meta=()}",
+ databaseName, i, i, i))
+ .toArray(String[]::new));
+ }
+
private void validateResult(String... expectedEvents) throws Exception {
+ validateResult(EVENT_DEFAULT_TIMEOUT, expectedEvents);
+ }
+
+ private void validateResult(long timeout, String... expectedEvents) throws Exception {
for (String event : expectedEvents) {
- waitUntilSpecificEvent(String.format(event, routeTestDatabase.getDatabaseName()));
+ waitUntilSpecificEvent(
+ timeout, String.format(event, routeTestDatabase.getDatabaseName()));
}
}
private void waitUntilSpecificEvent(String event) throws Exception {
+ waitUntilSpecificEvent(EVENT_DEFAULT_TIMEOUT, event);
+ }
+
+ private void waitUntilSpecificEvent(long timeout, String event) throws Exception {
boolean result = false;
- long endTimeout = System.currentTimeMillis() + EVENT_DEFAULT_TIMEOUT;
+ long endTimeout = System.currentTimeMillis() + timeout;
while (System.currentTimeMillis() < endTimeout) {
String stdout = taskManagerConsumer.toUtf8String();
if (stdout.contains(event + "\n")) {
@@ -839,8 +926,4 @@ private void waitUntilSpecificEvent(String event) throws Exception {
+ taskManagerConsumer.toUtf8String());
}
}
-
- private void assertNotExists(String event) {
- Assert.assertFalse(taskManagerConsumer.toUtf8String().contains(event));
- }
}
diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java
index 6ada032d56f..179a71e2dfa 100644
--- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java
+++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java
@@ -53,6 +53,7 @@
import org.apache.flink.cdc.runtime.operators.schema.metrics.SchemaOperatorMetrics;
import org.apache.flink.cdc.runtime.operators.sink.SchemaEvolutionClient;
import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
+import org.apache.flink.cdc.runtime.typeutils.NonceUtils;
import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway;
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
@@ -78,6 +79,7 @@
import java.io.Serializable;
import java.math.BigDecimal;
import java.time.Duration;
+import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.ArrayList;
@@ -430,6 +432,11 @@ private TableId resolveReplacement(
return TableId.parse(route.f1);
}
+ @VisibleForTesting
+ protected int getCurrentTimestamp() {
+ return (int) Instant.now().getEpochSecond();
+ }
+
private void handleSchemaChangeEvent(TableId tableId, SchemaChangeEvent schemaChangeEvent)
throws InterruptedException, TimeoutException {
@@ -442,16 +449,21 @@ private void handleSchemaChangeEvent(TableId tableId, SchemaChangeEvent schemaCh
schemaChangeEvent));
}
+ long nonce =
+ NonceUtils.generateNonce(
+ getCurrentTimestamp(), subTaskId, tableId, schemaChangeEvent);
+
+ LOG.info("{}> Sending the FlushEvent for table {} (nonce: {}).", subTaskId, tableId, nonce);
+ output.collect(new StreamRecord<>(new FlushEvent(tableId, nonce)));
+
// The request will block if another schema change event is being handled
- SchemaChangeResponse response = requestSchemaChange(tableId, schemaChangeEvent);
+ SchemaChangeResponse response = requestSchemaChange(tableId, schemaChangeEvent, nonce);
if (response.isAccepted()) {
- LOG.info("{}> Sending the FlushEvent for table {}.", subTaskId, tableId);
- output.collect(new StreamRecord<>(new FlushEvent(tableId)));
List expectedSchemaChangeEvents = response.getSchemaChangeEvents();
schemaOperatorMetrics.increaseSchemaChangeEvents(expectedSchemaChangeEvents.size());
// The request will block until flushing finished in each sink writer
- SchemaChangeResultResponse schemaEvolveResponse = requestSchemaChangeResult();
+ SchemaChangeResultResponse schemaEvolveResponse = requestSchemaChangeResult(nonce);
List finishedSchemaChangeEvents =
schemaEvolveResponse.getFinishedSchemaChangeEvents();
@@ -473,37 +485,44 @@ private void handleSchemaChangeEvent(TableId tableId, SchemaChangeEvent schemaCh
}
private SchemaChangeResponse requestSchemaChange(
- TableId tableId, SchemaChangeEvent schemaChangeEvent)
+ TableId tableId, SchemaChangeEvent schemaChangeEvent, long nonce)
throws InterruptedException, TimeoutException {
long schemaEvolveTimeOutMillis = System.currentTimeMillis() + rpcTimeOutInMillis;
while (true) {
SchemaChangeResponse response =
sendRequestToCoordinator(
- new SchemaChangeRequest(tableId, schemaChangeEvent, subTaskId));
- if (response.isRegistryBusy()) {
- if (System.currentTimeMillis() < schemaEvolveTimeOutMillis) {
+ new SchemaChangeRequest(tableId, schemaChangeEvent, subTaskId, nonce));
+ if (System.currentTimeMillis() < schemaEvolveTimeOutMillis) {
+ if (response.isRegistryBusy()) {
LOG.info(
"{}> Schema Registry is busy now, waiting for next request...",
subTaskId);
Thread.sleep(1000);
+ } else if (response.isWaitingForFlush()) {
+ LOG.info(
+ "{}> Schema change event (with nonce {}) has not collected enough flush success events from writers, waiting...",
+ subTaskId,
+ nonce);
+ Thread.sleep(1000);
} else {
- throw new TimeoutException("TimeOut when requesting schema change");
+ return response;
}
} else {
- return response;
+ throw new TimeoutException("TimeOut when requesting schema change");
}
}
}
- private SchemaChangeResultResponse requestSchemaChangeResult()
+ private SchemaChangeResultResponse requestSchemaChangeResult(long nonce)
throws InterruptedException, TimeoutException {
CoordinationResponse coordinationResponse =
- sendRequestToCoordinator(new SchemaChangeResultRequest());
+ sendRequestToCoordinator(new SchemaChangeResultRequest(nonce));
long nextRpcTimeOutMillis = System.currentTimeMillis() + rpcTimeOutInMillis;
while (coordinationResponse instanceof SchemaChangeProcessingResponse) {
if (System.currentTimeMillis() < nextRpcTimeOutMillis) {
Thread.sleep(1000);
- coordinationResponse = sendRequestToCoordinator(new SchemaChangeResultRequest());
+ coordinationResponse =
+ sendRequestToCoordinator(new SchemaChangeResultRequest(nonce));
} else {
throw new TimeoutException("TimeOut when requesting release upstream");
}
diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistry.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistry.java
index b7223c633b6..9c13b9ed5fe 100644
--- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistry.java
+++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistry.java
@@ -180,13 +180,12 @@ public void handleEventFromOperator(int subtask, int attemptNumber, OperatorEven
if (event instanceof FlushSuccessEvent) {
FlushSuccessEvent flushSuccessEvent = (FlushSuccessEvent) event;
LOG.info(
- "Sink subtask {} succeed flushing for table {}.",
+ "Sink subtask {} succeed flushing for table {} (nonce: {}).",
flushSuccessEvent.getSubtask(),
- flushSuccessEvent.getTableId().toString());
+ flushSuccessEvent.getTableId().toString(),
+ flushSuccessEvent.getNonce());
requestHandler.flushSuccess(
- flushSuccessEvent.getTableId(),
- flushSuccessEvent.getSubtask(),
- currentParallelism);
+ flushSuccessEvent.getSubtask(), flushSuccessEvent.getNonce());
} else if (event instanceof SinkWriterRegisterEvent) {
requestHandler.registerSinkWriter(
((SinkWriterRegisterEvent) event).getSubtask());
@@ -273,7 +272,8 @@ public CompletableFuture handleCoordinationRequest(
requestHandler.handleSchemaChangeRequest(
schemaChangeRequest, responseFuture);
} else if (request instanceof SchemaChangeResultRequest) {
- requestHandler.getSchemaChangeResult(responseFuture);
+ requestHandler.getSchemaChangeResult(
+ (SchemaChangeResultRequest) request, responseFuture);
} else if (request instanceof GetEvolvedSchemaRequest) {
handleGetEvolvedSchemaRequest(
((GetEvolvedSchemaRequest) request), responseFuture);
diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java
index b2a8a822df2..2faba42caef 100644
--- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java
+++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java
@@ -37,6 +37,7 @@
import org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeProcessingResponse;
import org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeRequest;
import org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeResponse;
+import org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeResultRequest;
import org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeResultResponse;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
@@ -80,16 +81,14 @@ public class SchemaRegistryRequestHandler implements Closeable {
*/
private volatile RequestStatus schemaChangeStatus;
- private final List pendingSubTaskIds;
private final Object schemaChangeRequestLock;
private volatile Throwable currentChangeException;
- private volatile List currentDerivedSchemaChangeEvents;
private volatile List currentFinishedSchemaChanges;
private volatile List currentIgnoredSchemaChanges;
/** Sink writers which have sent flush success events for the request. */
- private final Set flushedSinkWriters;
+ private final ConcurrentHashMap> flushedSinkWriters;
/** Executor service to execute schema change. */
private final ExecutorService schemaChangeThreadPool;
@@ -98,6 +97,8 @@ public class SchemaRegistryRequestHandler implements Closeable {
private final OperatorCoordinator.Context context;
+ private final int parallelism;
+
public SchemaRegistryRequestHandler(
MetadataApplier metadataApplier,
SchemaManager schemaManager,
@@ -111,16 +112,18 @@ public SchemaRegistryRequestHandler(
this.context = context;
this.activeSinkWriters = ConcurrentHashMap.newKeySet();
- this.flushedSinkWriters = ConcurrentHashMap.newKeySet();
+ this.flushedSinkWriters = new ConcurrentHashMap<>();
this.schemaChangeThreadPool = Executors.newSingleThreadExecutor();
- this.currentDerivedSchemaChangeEvents = new ArrayList<>();
this.currentFinishedSchemaChanges = new ArrayList<>();
this.currentIgnoredSchemaChanges = new ArrayList<>();
this.schemaChangeStatus = RequestStatus.IDLE;
- this.pendingSubTaskIds = new ArrayList<>();
this.schemaChangeRequestLock = new Object();
+
+ // This check is meant to allow migration test pass since we don't have a valid
+ // `OperatorCoordinator.Context` in mocked environment.
+ this.parallelism = context != null ? context.currentParallelism() : 0;
}
/**
@@ -131,53 +134,45 @@ public SchemaRegistryRequestHandler(
public void handleSchemaChangeRequest(
SchemaChangeRequest request, CompletableFuture response) {
- // We use requester subTask ID as the pending ticket, because there will be at most 1 schema
- // change requests simultaneously from each subTask
- int requestSubTaskId = request.getSubTaskId();
+ // We use nonce to identify each schema change request
+ long nonce = request.getNonce();
synchronized (schemaChangeRequestLock) {
// Make sure we handle the first request in the pending list to avoid out-of-order
// waiting and blocks checkpointing mechanism.
if (schemaChangeStatus == RequestStatus.IDLE) {
- if (pendingSubTaskIds.isEmpty()) {
- LOG.info(
- "Received schema change event request {} from table {} from subTask {}. Pending list is empty, handling this.",
- request.getSchemaChangeEvent(),
- request.getTableId().toString(),
- requestSubTaskId);
- } else if (pendingSubTaskIds.get(0) == requestSubTaskId) {
- LOG.info(
- "Received schema change event request {} from table {} from subTask {}. It is on the first of the pending list, handling this.",
- request.getSchemaChangeEvent(),
- request.getTableId().toString(),
- requestSubTaskId);
- pendingSubTaskIds.remove(0);
- } else {
- LOG.info(
- "Received schema change event request {} from table {} from subTask {}. It is not the first of the pending list ({}).",
- request.getSchemaChangeEvent(),
- request.getTableId().toString(),
- requestSubTaskId,
- pendingSubTaskIds);
- if (!pendingSubTaskIds.contains(requestSubTaskId)) {
- pendingSubTaskIds.add(requestSubTaskId);
- }
- response.complete(wrap(SchemaChangeResponse.busy()));
- return;
- }
-
SchemaChangeEvent event = request.getSchemaChangeEvent();
// If this schema change event has been requested by another subTask, ignore it.
if (schemaManager.isOriginalSchemaChangeEventRedundant(event)) {
LOG.info("Event {} has been addressed before, ignoring it.", event);
- clearCurrentSchemaChangeRequest();
+ clearCurrentSchemaChangeRequest(nonce);
LOG.info(
- "SchemaChangeStatus switched from WAITING_FOR_FLUSH to IDLE for request {} due to duplicated request.",
+ "SchemaChangeStatus is still IDLE for request {} due to duplicated request.",
request);
response.complete(wrap(SchemaChangeResponse.duplicate()));
return;
}
+
+ if (activeSinkWriters.size() < parallelism) {
+ LOG.info(
+ "Not all active sink writers have been registered. Current {}, expected {}.",
+ activeSinkWriters.size(),
+ parallelism);
+ response.complete(wrap(SchemaChangeResponse.waitingForFlush()));
+ return;
+ }
+
+ if (!activeSinkWriters.equals(flushedSinkWriters.get(nonce))) {
+ LOG.info(
+ "Not all active sink writers have completed flush (nonce: {}). Flushed writers: {}, expected: {}.",
+ nonce,
+ flushedSinkWriters.get(nonce),
+ activeSinkWriters);
+ response.complete(wrap(SchemaChangeResponse.waitingForFlush()));
+ return;
+ }
+
schemaManager.applyOriginalSchemaChange(event);
List derivedSchemaChangeEvents =
calculateDerivedSchemaChangeEvents(request.getSchemaChangeEvent());
@@ -186,9 +181,9 @@ public void handleSchemaChangeRequest(
// route strategies, ignore it.
if (derivedSchemaChangeEvents.isEmpty()) {
LOG.info("Event {} is omitted from sending to downstream, ignoring it.", event);
- clearCurrentSchemaChangeRequest();
+ clearCurrentSchemaChangeRequest(nonce);
LOG.info(
- "SchemaChangeStatus switched from WAITING_FOR_FLUSH to IDLE for request {} due to ignored request.",
+ "SchemaChangeStatus is still IDLE for request {} due to ignored request.",
request);
response.complete(wrap(SchemaChangeResponse.ignored()));
@@ -196,9 +191,9 @@ public void handleSchemaChangeRequest(
}
LOG.info(
- "SchemaChangeStatus switched from IDLE to WAITING_FOR_FLUSH, other requests will be blocked.");
+ "SchemaChangeStatus switched from IDLE to APPLYING, other requests will be blocked.");
// This request has been accepted.
- schemaChangeStatus = RequestStatus.WAITING_FOR_FLUSH;
+ schemaChangeStatus = RequestStatus.APPLYING;
// Backfill pre-schema info for sink applying
derivedSchemaChangeEvents.forEach(
@@ -213,18 +208,16 @@ public void handleSchemaChangeRequest(
}
}
});
- currentDerivedSchemaChangeEvents = new ArrayList<>(derivedSchemaChangeEvents);
response.complete(wrap(SchemaChangeResponse.accepted(derivedSchemaChangeEvents)));
- } else {
+
LOG.info(
- "Schema Registry is busy processing a schema change request, could not handle request {} for now. Added {} to pending list ({}).",
- request,
- requestSubTaskId,
- pendingSubTaskIds);
- if (!pendingSubTaskIds.contains(requestSubTaskId)) {
- pendingSubTaskIds.add(requestSubTaskId);
- }
+ "All sink subtask have flushed for table {}. Start to apply schema change request {}.",
+ request.getTableId().toString(),
+ request);
+ schemaChangeThreadPool.submit(
+ () -> applySchemaChange(request.getTableId(), derivedSchemaChangeEvents));
+ } else {
response.complete(wrap(SchemaChangeResponse.busy()));
}
}
@@ -277,9 +270,7 @@ private void applySchemaChange(
"Illegal schemaChangeStatus state: should be APPLYING before applySchemaChange finishes, not "
+ schemaChangeStatus);
schemaChangeStatus = RequestStatus.FINISHED;
- LOG.info(
- "SchemaChangeStatus switched from APPLYING to FINISHED for request {}.",
- currentDerivedSchemaChangeEvents);
+ LOG.info("SchemaChangeStatus switched from APPLYING to FINISHED.");
}
/**
@@ -295,45 +286,31 @@ public void registerSinkWriter(int sinkSubtask) {
/**
* Record flushed sink subtasks after receiving FlushSuccessEvent.
*
- * @param tableId the subtask in SchemaOperator and table that the FlushEvent is about
* @param sinkSubtask the sink subtask succeed flushing
*/
- public void flushSuccess(TableId tableId, int sinkSubtask, int parallelism) {
- flushedSinkWriters.add(sinkSubtask);
- if (activeSinkWriters.size() < parallelism) {
- LOG.info(
- "Not all active sink writers have been registered. Current {}, expected {}.",
- activeSinkWriters.size(),
- parallelism);
- return;
- }
- if (flushedSinkWriters.equals(activeSinkWriters)) {
- Preconditions.checkState(
- schemaChangeStatus == RequestStatus.WAITING_FOR_FLUSH,
- "Illegal schemaChangeStatus state: should be WAITING_FOR_FLUSH before collecting enough FlushEvents, not "
- + schemaChangeStatus);
-
- schemaChangeStatus = RequestStatus.APPLYING;
- LOG.info(
- "All sink subtask have flushed for table {}. Start to apply schema change.",
- tableId.toString());
- schemaChangeThreadPool.submit(
- () -> applySchemaChange(tableId, currentDerivedSchemaChangeEvents));
+ public void flushSuccess(int sinkSubtask, long nonce) {
+ synchronized (schemaChangeRequestLock) {
+ if (!flushedSinkWriters.containsKey(nonce)) {
+ flushedSinkWriters.put(nonce, ConcurrentHashMap.newKeySet());
+ }
+ flushedSinkWriters.get(nonce).add(sinkSubtask);
}
}
- public void getSchemaChangeResult(CompletableFuture response) {
+ public void getSchemaChangeResult(
+ SchemaChangeResultRequest request, CompletableFuture response) {
Preconditions.checkState(
schemaChangeStatus != RequestStatus.IDLE,
"Illegal schemaChangeStatus: should not be IDLE before getting schema change request results.");
if (schemaChangeStatus == RequestStatus.FINISHED) {
schemaChangeStatus = RequestStatus.IDLE;
LOG.info(
- "SchemaChangeStatus switched from FINISHED to IDLE for request {}",
- currentDerivedSchemaChangeEvents);
+ "SchemaChangeStatus switched from FINISHED to IDLE. (nonce: {})",
+ request.getNonce());
// This request has been finished, return it and prepare for the next request
- List finishedEvents = clearCurrentSchemaChangeRequest();
+ List finishedEvents =
+ clearCurrentSchemaChangeRequest(request.getNonce());
SchemaChangeResultResponse resultResponse =
new SchemaChangeResultResponse(finishedEvents);
response.complete(wrap(resultResponse));
@@ -470,15 +447,14 @@ private boolean shouldIgnoreException(Throwable throwable) {
&& (schemaChangeBehavior == SchemaChangeBehavior.TRY_EVOLVE);
}
- private List clearCurrentSchemaChangeRequest() {
+ private List clearCurrentSchemaChangeRequest(long nonce) {
if (currentChangeException != null) {
context.failJob(
new RuntimeException("Failed to apply schema change.", currentChangeException));
}
List finishedSchemaChanges =
new ArrayList<>(currentFinishedSchemaChanges);
- flushedSinkWriters.clear();
- currentDerivedSchemaChangeEvents.clear();
+ flushedSinkWriters.remove(nonce);
currentFinishedSchemaChanges.clear();
currentIgnoredSchemaChanges.clear();
currentChangeException = null;
@@ -487,30 +463,24 @@ private List clearCurrentSchemaChangeRequest() {
// Schema change event state could transfer in the following way:
//
- // -------- B --------
- // | |
- // v |
- // -------- ---------------------
- // | IDLE | --- A --> | WAITING_FOR_FLUSH |
- // -------- ---------------------
+
+ // --------
+ // | IDLE | -------------------A
+ // -------- |
// ^ |
- // E C
+ // C |
// \ v
// ------------ ------------
- // | FINISHED | <-- D -- | APPLYING |
+ // | FINISHED | <-- B -- | APPLYING |
// ------------ ------------
//
- // A: When a request came to an idling request handler.
- // B: When current request is duplicate or ignored by LENIENT / routed table merging
- // strategies.
- // C: When schema registry collected enough flush success events, and actually started to apply
- // schema changes.
- // D: When schema change application finishes (successfully or with exceptions)
- // E: When current schema change request result has been retrieved by SchemaOperator, and ready
- // for the next request.
+ // A: When a request came to an idling request handler. Only possible when registry is IDLE,
+ // and it has collected all FlushEvents from sink writers.
+ // B: When schema change application finishes (successfully or with exceptions)
+ // C: When current schema change request result has been retrieved by SchemaOperator, and be
+ // ready for the next request.
private enum RequestStatus {
IDLE,
- WAITING_FOR_FLUSH,
APPLYING,
FINISHED
}
diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/FlushSuccessEvent.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/FlushSuccessEvent.java
index 72951446456..6959e883b85 100644
--- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/FlushSuccessEvent.java
+++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/FlushSuccessEvent.java
@@ -36,9 +36,16 @@ public class FlushSuccessEvent implements OperatorEvent {
/** The schema changes from which table is executing it. */
private final TableId tableId;
- public FlushSuccessEvent(int subtask, TableId tableId) {
+ /**
+ * Nonce code to distinguish flush events corresponding to each schema change event from
+ * different subTasks.
+ */
+ private final long nonce;
+
+ public FlushSuccessEvent(int subtask, TableId tableId, long nonce) {
this.subtask = subtask;
this.tableId = tableId;
+ this.nonce = nonce;
}
public int getSubtask() {
@@ -49,6 +56,10 @@ public TableId getTableId() {
return tableId;
}
+ public long getNonce() {
+ return nonce;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -58,11 +69,13 @@ public boolean equals(Object o) {
return false;
}
FlushSuccessEvent that = (FlushSuccessEvent) o;
- return subtask == that.subtask && Objects.equals(tableId, that.tableId);
+ return subtask == that.subtask
+ && Objects.equals(tableId, that.tableId)
+ && nonce == that.nonce;
}
@Override
public int hashCode() {
- return Objects.hash(subtask, tableId);
+ return Objects.hash(subtask, tableId, nonce);
}
}
diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/SchemaChangeRequest.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/SchemaChangeRequest.java
index fbc5e9c4037..fba1e505577 100644
--- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/SchemaChangeRequest.java
+++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/SchemaChangeRequest.java
@@ -38,12 +38,18 @@ public class SchemaChangeRequest implements CoordinationRequest {
private final SchemaChangeEvent schemaChangeEvent;
/** The ID of subTask that initiated the request. */
private final int subTaskId;
+ /**
+ * Nonce code to distinguish flush events corresponding to each schema change event from
+ * different subTasks.
+ */
+ private final long nonce;
public SchemaChangeRequest(
- TableId tableId, SchemaChangeEvent schemaChangeEvent, int subTaskId) {
+ TableId tableId, SchemaChangeEvent schemaChangeEvent, int subTaskId, long nonce) {
this.tableId = tableId;
this.schemaChangeEvent = schemaChangeEvent;
this.subTaskId = subTaskId;
+ this.nonce = nonce;
}
public TableId getTableId() {
@@ -58,6 +64,10 @@ public int getSubTaskId() {
return subTaskId;
}
+ public long getNonce() {
+ return nonce;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -69,11 +79,12 @@ public boolean equals(Object o) {
SchemaChangeRequest that = (SchemaChangeRequest) o;
return Objects.equals(tableId, that.tableId)
&& Objects.equals(schemaChangeEvent, that.schemaChangeEvent)
- && subTaskId == that.subTaskId;
+ && subTaskId == that.subTaskId
+ && nonce == that.nonce;
}
@Override
public int hashCode() {
- return Objects.hash(tableId, schemaChangeEvent, subTaskId);
+ return Objects.hash(tableId, schemaChangeEvent, subTaskId, nonce);
}
}
diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/SchemaChangeResponse.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/SchemaChangeResponse.java
index 63d57139b10..bea0ce65ce1 100644
--- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/SchemaChangeResponse.java
+++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/SchemaChangeResponse.java
@@ -57,6 +57,10 @@ public static SchemaChangeResponse ignored() {
return new SchemaChangeResponse(Collections.emptyList(), ResponseCode.IGNORED);
}
+ public static SchemaChangeResponse waitingForFlush() {
+ return new SchemaChangeResponse(Collections.emptyList(), ResponseCode.WAITING_FOR_FLUSH);
+ }
+
private SchemaChangeResponse(
List schemaChangeEvents, ResponseCode responseCode) {
this.schemaChangeEvents = schemaChangeEvents;
@@ -79,6 +83,10 @@ public boolean isIgnored() {
return ResponseCode.IGNORED.equals(responseCode);
}
+ public boolean isWaitingForFlush() {
+ return ResponseCode.WAITING_FOR_FLUSH.equals(responseCode);
+ }
+
public List getSchemaChangeEvents() {
return schemaChangeEvents;
}
@@ -129,6 +137,7 @@ public enum ResponseCode {
ACCEPTED,
BUSY,
DUPLICATE,
- IGNORED
+ IGNORED,
+ WAITING_FOR_FLUSH
}
}
diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/SchemaChangeResultRequest.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/SchemaChangeResultRequest.java
index e53762c199f..8160494ce9f 100644
--- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/SchemaChangeResultRequest.java
+++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/SchemaChangeResultRequest.java
@@ -28,4 +28,36 @@
public class SchemaChangeResultRequest implements CoordinationRequest {
private static final long serialVersionUID = 1L;
+
+ /**
+ * Nonce code to distinguish flush events corresponding to each schema change event from
+ * different subTasks.
+ */
+ private final long nonce;
+
+ public SchemaChangeResultRequest(long nonce) {
+ this.nonce = nonce;
+ }
+
+ public long getNonce() {
+ return nonce;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ SchemaChangeResultRequest that = (SchemaChangeResultRequest) o;
+ return nonce == that.nonce;
+ }
+
+ @Override
+ public int hashCode() {
+ return Long.hashCode(nonce);
+ }
}
diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkFunctionOperator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkFunctionOperator.java
index 3767ba32123..b7f8f217faa 100644
--- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkFunctionOperator.java
+++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkFunctionOperator.java
@@ -113,7 +113,7 @@ public void processElement(StreamRecord element) throws Exception {
private void handleFlushEvent(FlushEvent event) throws Exception {
userFunction.finish();
schemaEvolutionClient.notifyFlushSuccess(
- getRuntimeContext().getIndexOfThisSubtask(), event.getTableId());
+ getRuntimeContext().getIndexOfThisSubtask(), event.getTableId(), event.getNonce());
}
private void emitLatestSchema(TableId tableId) throws Exception {
diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkWriterOperator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkWriterOperator.java
index 554d7970272..a3719f91c16 100644
--- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkWriterOperator.java
+++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkWriterOperator.java
@@ -199,7 +199,7 @@ public void endInput() throws Exception {
private void handleFlushEvent(FlushEvent event) throws Exception {
copySinkWriter.flush(false);
schemaEvolutionClient.notifyFlushSuccess(
- getRuntimeContext().getIndexOfThisSubtask(), event.getTableId());
+ getRuntimeContext().getIndexOfThisSubtask(), event.getTableId(), event.getNonce());
}
private void emitLatestSchema(TableId tableId) throws Exception {
diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/SchemaEvolutionClient.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/SchemaEvolutionClient.java
index aa256787f16..0e7df4a2446 100644
--- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/SchemaEvolutionClient.java
+++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/SchemaEvolutionClient.java
@@ -60,9 +60,10 @@ public void registerSubtask(int subtask) throws IOException {
}
/** send {@link FlushSuccessEvent} to {@link SchemaRegistry}. */
- public void notifyFlushSuccess(int subtask, TableId tableId) throws IOException {
+ public void notifyFlushSuccess(int subtask, TableId tableId, long nonce) throws IOException {
toCoordinator.sendOperatorEventToCoordinator(
- schemaOperatorID, new SerializedValue<>(new FlushSuccessEvent(subtask, tableId)));
+ schemaOperatorID,
+ new SerializedValue<>(new FlushSuccessEvent(subtask, tableId, nonce)));
}
public Optional getLatestEvolvedSchema(TableId tableId) throws Exception {
diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/event/EventSerializer.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/event/EventSerializer.java
index c8f34020dfe..2c10ad73c49 100644
--- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/event/EventSerializer.java
+++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/event/EventSerializer.java
@@ -61,7 +61,9 @@ public Event createInstance() {
@Override
public Event copy(Event from) {
if (from instanceof FlushEvent) {
- return new FlushEvent(tableIdSerializer.copy(((FlushEvent) from).getTableId()));
+ return new FlushEvent(
+ tableIdSerializer.copy(((FlushEvent) from).getTableId()),
+ ((FlushEvent) from).getNonce());
} else if (from instanceof SchemaChangeEvent) {
return schemaChangeEventSerializer.copy((SchemaChangeEvent) from);
} else if (from instanceof DataChangeEvent) {
@@ -85,6 +87,7 @@ public void serialize(Event record, DataOutputView target) throws IOException {
if (record instanceof FlushEvent) {
enumSerializer.serialize(EventClass.FLUSH_EVENT, target);
tableIdSerializer.serialize(((FlushEvent) record).getTableId(), target);
+ target.writeLong(((FlushEvent) record).getNonce());
} else if (record instanceof SchemaChangeEvent) {
enumSerializer.serialize(EventClass.SCHEME_CHANGE_EVENT, target);
schemaChangeEventSerializer.serialize((SchemaChangeEvent) record, target);
@@ -101,7 +104,7 @@ public Event deserialize(DataInputView source) throws IOException {
EventClass eventClass = enumSerializer.deserialize(source);
switch (eventClass) {
case FLUSH_EVENT:
- return new FlushEvent(tableIdSerializer.deserialize(source));
+ return new FlushEvent(tableIdSerializer.deserialize(source), source.readLong());
case DATA_CHANGE_EVENT:
return dataChangeEventSerializer.deserialize(source);
case SCHEME_CHANGE_EVENT:
diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/typeutils/NonceUtils.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/typeutils/NonceUtils.java
new file mode 100644
index 00000000000..0d9e0b74643
--- /dev/null
+++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/typeutils/NonceUtils.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.runtime.typeutils;
+
+import org.apache.flink.cdc.common.annotation.PublicEvolving;
+import org.apache.flink.cdc.common.event.Event;
+import org.apache.flink.cdc.common.event.FlushEvent;
+import org.apache.flink.cdc.common.event.TableId;
+
+import java.util.Objects;
+
+/**
+ * Generates schema evolution nonce value and corresponding {@link FlushEvent}s. It is guaranteed to
+ * be unique by combining epoch timestamp, subTaskId, Table ID and schema change event into a long
+ * hashCode.
+ */
+@PublicEvolving
+public class NonceUtils {
+
+ /**
+ * Generating a nonce value with current @{code timestamp}, {@code subTaskId}, {@code tableId},
+ * and {@code schemaChangeEvent}. The higher 32 bits are current UTC timestamp in epoch seconds,
+ * and the lower 32 bits are Java hashCode of the rest parameters.
+ */
+ public static long generateNonce(
+ int timestamp, int subTaskId, TableId tableId, Event schemaChangeEvent) {
+ return (long) timestamp << 32
+ | Integer.toUnsignedLong(Objects.hash(subTaskId, tableId, schemaChangeEvent));
+ }
+
+ /** Generating a {@link FlushEvent} carrying a nonce. */
+ public static FlushEvent generateFlushEvent(
+ int timestamp, int subTaskId, TableId tableId, Event schemaChangeEvent) {
+ return new FlushEvent(
+ tableId, generateNonce(timestamp, subTaskId, tableId, schemaChangeEvent));
+ }
+}
diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/SchemaEvolveTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/SchemaEvolveTest.java
index e1f6a94c96d..d60b6d55a9d 100644
--- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/SchemaEvolveTest.java
+++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/SchemaEvolveTest.java
@@ -25,13 +25,13 @@
import org.apache.flink.cdc.common.event.DataChangeEvent;
import org.apache.flink.cdc.common.event.DropColumnEvent;
import org.apache.flink.cdc.common.event.Event;
-import org.apache.flink.cdc.common.event.FlushEvent;
import org.apache.flink.cdc.common.event.RenameColumnEvent;
import org.apache.flink.cdc.common.event.SchemaChangeEventType;
import org.apache.flink.cdc.common.event.SchemaChangeEventTypeFamily;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.exceptions.UnsupportedSchemaChangeEventException;
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
+import org.apache.flink.cdc.common.route.RouteRule;
import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.types.DataType;
@@ -39,6 +39,7 @@
import org.apache.flink.cdc.common.types.RowType;
import org.apache.flink.cdc.runtime.testutils.operators.EventOperatorTestHarness;
import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
+import org.apache.flink.cdc.runtime.typeutils.NonceUtils;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap;
@@ -69,6 +70,26 @@ public class SchemaEvolveTest {
private static final TableId CUSTOMERS_TABLE_ID =
TableId.tableId("my_company", "my_branch", "customers");
+ /**
+ * A mocked schema operator that always yields predictable nonce with incrementing timestamp.
+ */
+ public static class MockedSchemaOperator extends SchemaOperator {
+
+ public MockedSchemaOperator(
+ List routingRules,
+ Duration rpcTimeOut,
+ SchemaChangeBehavior schemaChangeBehavior) {
+ super(routingRules, rpcTimeOut, schemaChangeBehavior);
+ }
+
+ private int i = 0;
+
+ @Override
+ protected int getCurrentTimestamp() {
+ return ++i;
+ }
+ }
+
/** Tests common evolve schema changes without exceptions. */
@Test
public void testEvolveSchema() throws Exception {
@@ -84,7 +105,7 @@ public void testEvolveSchema() throws Exception {
SchemaChangeBehavior behavior = SchemaChangeBehavior.EVOLVE;
SchemaOperator schemaOperator =
- new SchemaOperator(new ArrayList<>(), Duration.ofSeconds(30), behavior);
+ new MockedSchemaOperator(new ArrayList<>(), Duration.ofSeconds(30), behavior);
EventOperatorTestHarness harness =
new EventOperatorTestHarness<>(schemaOperator, 17, Duration.ofSeconds(3), behavior);
harness.open();
@@ -108,7 +129,12 @@ public void testEvolveSchema() throws Exception {
Assertions.assertThat(
ListUtils.union(
- Collections.singletonList(new FlushEvent(tableId)),
+ Collections.singletonList(
+ NonceUtils.generateFlushEvent(
+ 1,
+ 0,
+ tableId,
+ createAndInsertDataEvents.get(0))),
createAndInsertDataEvents))
.isEqualTo(
harness.getOutputRecords().stream()
@@ -168,7 +194,9 @@ public void testEvolveSchema() throws Exception {
.collect(Collectors.toList()))
.isEqualTo(
ListUtils.union(
- Collections.singletonList(new FlushEvent(tableId)),
+ Collections.singletonList(
+ NonceUtils.generateFlushEvent(
+ 2, 0, tableId, addColumnEvents.get(0))),
addColumnEvents));
Schema schemaV2 =
@@ -227,7 +255,9 @@ public void testEvolveSchema() throws Exception {
.collect(Collectors.toList()))
.isEqualTo(
ListUtils.union(
- Collections.singletonList(new FlushEvent(tableId)),
+ Collections.singletonList(
+ NonceUtils.generateFlushEvent(
+ 3, 0, tableId, renameColumnEvents.get(0))),
renameColumnEvents));
Schema schemaV3 =
@@ -272,7 +302,9 @@ public void testEvolveSchema() throws Exception {
.collect(Collectors.toList()))
.isEqualTo(
ListUtils.union(
- Collections.singletonList(new FlushEvent(tableId)),
+ Collections.singletonList(
+ NonceUtils.generateFlushEvent(
+ 4, 0, tableId, alterColumnTypeEvents.get(0))),
alterColumnTypeEvents));
Schema schemaV4 =
@@ -308,7 +340,9 @@ tableId, buildRecord(INT, 12, STRING, "Jane", FLOAT, 11f)),
.collect(Collectors.toList()))
.isEqualTo(
ListUtils.union(
- Collections.singletonList(new FlushEvent(tableId)),
+ Collections.singletonList(
+ NonceUtils.generateFlushEvent(
+ 5, 0, tableId, dropColumnEvents.get(0))),
dropColumnEvents));
Schema schemaV5 =
@@ -341,7 +375,7 @@ public void testTryEvolveSchema() throws Exception {
SchemaChangeBehavior behavior = SchemaChangeBehavior.TRY_EVOLVE;
SchemaOperator schemaOperator =
- new SchemaOperator(new ArrayList<>(), Duration.ofSeconds(30), behavior);
+ new MockedSchemaOperator(new ArrayList<>(), Duration.ofSeconds(30), behavior);
EventOperatorTestHarness harness =
new EventOperatorTestHarness<>(schemaOperator, 17, Duration.ofSeconds(3), behavior);
harness.open();
@@ -369,7 +403,12 @@ public void testTryEvolveSchema() throws Exception {
.collect(Collectors.toList()))
.isEqualTo(
ListUtils.union(
- Collections.singletonList(new FlushEvent(tableId)),
+ Collections.singletonList(
+ NonceUtils.generateFlushEvent(
+ 1,
+ 0,
+ tableId,
+ createAndInsertDataEvents.get(0))),
createAndInsertDataEvents));
Assertions.assertThat(harness.getLatestOriginalSchema(tableId)).isEqualTo(schemaV1);
@@ -425,7 +464,9 @@ public void testTryEvolveSchema() throws Exception {
.collect(Collectors.toList()))
.isEqualTo(
ListUtils.union(
- Collections.singletonList(new FlushEvent(tableId)),
+ Collections.singletonList(
+ NonceUtils.generateFlushEvent(
+ 2, 0, tableId, addColumnEvents.get(0))),
addColumnEvents));
Schema schemaV2 =
@@ -484,7 +525,9 @@ public void testTryEvolveSchema() throws Exception {
.collect(Collectors.toList()))
.isEqualTo(
ListUtils.union(
- Collections.singletonList(new FlushEvent(tableId)),
+ Collections.singletonList(
+ NonceUtils.generateFlushEvent(
+ 3, 0, tableId, renameColumnEvents.get(0))),
renameColumnEvents));
Schema schemaV3 =
@@ -529,7 +572,9 @@ public void testTryEvolveSchema() throws Exception {
.collect(Collectors.toList()))
.isEqualTo(
ListUtils.union(
- Collections.singletonList(new FlushEvent(tableId)),
+ Collections.singletonList(
+ NonceUtils.generateFlushEvent(
+ 4, 0, tableId, alterColumnTypeEvents.get(0))),
alterColumnTypeEvents));
Schema schemaV4 =
@@ -565,7 +610,9 @@ tableId, buildRecord(INT, 12, STRING, "Jane", FLOAT, 11f)),
.collect(Collectors.toList()))
.isEqualTo(
ListUtils.union(
- Collections.singletonList(new FlushEvent(tableId)),
+ Collections.singletonList(
+ NonceUtils.generateFlushEvent(
+ 5, 0, tableId, dropColumnEvents.get(0))),
dropColumnEvents));
Schema schemaV5 =
@@ -598,7 +645,7 @@ public void testExceptionEvolveSchema() throws Exception {
SchemaChangeBehavior behavior = SchemaChangeBehavior.EXCEPTION;
SchemaOperator schemaOperator =
- new SchemaOperator(new ArrayList<>(), Duration.ofSeconds(30), behavior);
+ new MockedSchemaOperator(new ArrayList<>(), Duration.ofSeconds(30), behavior);
EventOperatorTestHarness harness =
new EventOperatorTestHarness<>(schemaOperator, 17, Duration.ofSeconds(3), behavior);
harness.open();
@@ -626,7 +673,12 @@ public void testExceptionEvolveSchema() throws Exception {
.collect(Collectors.toList()))
.isEqualTo(
ListUtils.union(
- Collections.singletonList(new FlushEvent(tableId)),
+ Collections.singletonList(
+ NonceUtils.generateFlushEvent(
+ 1,
+ 0,
+ tableId,
+ createAndInsertDataEvents.get(0))),
createAndInsertDataEvents));
Assertions.assertThat(harness.getLatestOriginalSchema(tableId)).isEqualTo(schemaV1);
@@ -707,7 +759,7 @@ public void testIgnoreEvolveSchema() throws Exception {
SchemaChangeBehavior behavior = SchemaChangeBehavior.IGNORE;
SchemaOperator schemaOperator =
- new SchemaOperator(new ArrayList<>(), Duration.ofSeconds(30), behavior);
+ new MockedSchemaOperator(new ArrayList<>(), Duration.ofSeconds(30), behavior);
EventOperatorTestHarness harness =
new EventOperatorTestHarness<>(schemaOperator, 17, Duration.ofSeconds(3), behavior);
harness.open();
@@ -735,7 +787,12 @@ public void testIgnoreEvolveSchema() throws Exception {
.collect(Collectors.toList()))
.isEqualTo(
ListUtils.union(
- Collections.singletonList(new FlushEvent(tableId)),
+ Collections.singletonList(
+ NonceUtils.generateFlushEvent(
+ 1,
+ 0,
+ tableId,
+ createAndInsertDataEvents.get(0))),
createAndInsertDataEvents));
Assertions.assertThat(harness.getLatestOriginalSchema(tableId)).isEqualTo(schemaV1);
@@ -787,7 +844,7 @@ public void testIgnoreEvolveSchema() throws Exception {
List expectedEvents =
Arrays.asList(
- new FlushEvent(tableId),
+ NonceUtils.generateFlushEvent(2, 0, tableId, addColumnEvents.get(0)),
DataChangeEvent.insertEvent(
tableId,
buildRecord(INT, 4, STRING, "Derrida", SMALLINT, (short) 20)),
@@ -855,7 +912,7 @@ public void testIgnoreEvolveSchema() throws Exception {
List expectedEvents =
Arrays.asList(
- new FlushEvent(tableId),
+ NonceUtils.generateFlushEvent(3, 0, tableId, renameColumnEvents.get(0)),
DataChangeEvent.insertEvent(
tableId,
buildRecord(INT, 6, STRING, null, SMALLINT, (short) 22)),
@@ -904,7 +961,8 @@ public void testIgnoreEvolveSchema() throws Exception {
List expectedEvents =
Arrays.asList(
- new FlushEvent(tableId),
+ NonceUtils.generateFlushEvent(
+ 4, 0, tableId, alterColumnTypeEvents.get(0)),
DataChangeEvent.insertEvent(
tableId, buildRecord(INT, 8, STRING, null, SMALLINT, null)),
DataChangeEvent.insertEvent(
@@ -945,7 +1003,7 @@ tableId, buildRecord(INT, 12, STRING, "Jane", FLOAT, 11f)),
List expectedEvents =
Arrays.asList(
- new FlushEvent(tableId),
+ NonceUtils.generateFlushEvent(5, 0, tableId, dropColumnEvents.get(0)),
DataChangeEvent.insertEvent(
tableId, buildRecord(INT, 12, STRING, null, DOUBLE, null)),
DataChangeEvent.insertEvent(
@@ -986,7 +1044,7 @@ public void testEvolveSchemaWithFailure() throws Exception {
SchemaChangeBehavior behavior = SchemaChangeBehavior.EVOLVE;
SchemaOperator schemaOperator =
- new SchemaOperator(new ArrayList<>(), Duration.ofSeconds(30), behavior);
+ new MockedSchemaOperator(new ArrayList<>(), Duration.ofSeconds(30), behavior);
EventOperatorTestHarness harness =
new EventOperatorTestHarness<>(
schemaOperator,
@@ -1021,7 +1079,9 @@ tableId, buildRecord(INT, 2, STRING, "Bob", SMALLINT, (short) 18)),
.collect(Collectors.toList()))
.isEqualTo(
ListUtils.union(
- Collections.singletonList(new FlushEvent(tableId)),
+ Collections.singletonList(
+ NonceUtils.generateFlushEvent(
+ 1, 0, tableId, createAndInsertDataEvents.get(0))),
createAndInsertDataEvents));
Assertions.assertThat(harness.getLatestOriginalSchema(tableId)).isEqualTo(schemaV1);
@@ -1068,7 +1128,7 @@ public void testTryEvolveSchemaWithFailure() throws Exception {
SchemaChangeBehavior behavior = SchemaChangeBehavior.TRY_EVOLVE;
SchemaOperator schemaOperator =
- new SchemaOperator(new ArrayList<>(), Duration.ofSeconds(30), behavior);
+ new MockedSchemaOperator(new ArrayList<>(), Duration.ofSeconds(30), behavior);
// All types of schema change events will be sent to the sink
// AddColumn and RenameColumn events will always fail
@@ -1108,7 +1168,12 @@ public void testTryEvolveSchemaWithFailure() throws Exception {
.collect(Collectors.toList()))
.isEqualTo(
ListUtils.union(
- Collections.singletonList(new FlushEvent(tableId)),
+ Collections.singletonList(
+ NonceUtils.generateFlushEvent(
+ 1,
+ 0,
+ tableId,
+ createAndInsertDataEvents.get(0))),
createAndInsertDataEvents));
Assertions.assertThat(harness.getLatestOriginalSchema(tableId)).isEqualTo(schemaV1);
@@ -1159,7 +1224,8 @@ public void testTryEvolveSchemaWithFailure() throws Exception {
processEvent(schemaOperator, addColumnEvents);
List expectedEvents = new ArrayList<>();
- expectedEvents.add(new FlushEvent(tableId));
+ expectedEvents.add(
+ NonceUtils.generateFlushEvent(2, 0, tableId, addColumnEvents.get(0)));
expectedEvents.addAll(addColumnEvents);
Assertions.assertThat(
@@ -1221,7 +1287,8 @@ public void testTryEvolveSchemaWithFailure() throws Exception {
processEvent(schemaOperator, renameColumnEvents);
List expectedEvents = new ArrayList<>();
- expectedEvents.add(new FlushEvent(tableId));
+ expectedEvents.add(
+ NonceUtils.generateFlushEvent(3, 0, tableId, renameColumnEvents.get(0)));
expectedEvents.addAll(renameColumnEvents);
Assertions.assertThat(
@@ -1266,7 +1333,8 @@ public void testTryEvolveSchemaWithFailure() throws Exception {
List expectedEvents =
Arrays.asList(
- new FlushEvent(tableId),
+ NonceUtils.generateFlushEvent(
+ 4, 0, tableId, alterColumnTypeEvents.get(0)),
DataChangeEvent.insertEvent(
tableId,
buildRecord(
@@ -1324,7 +1392,7 @@ tableId, buildRecord(INT, 12, STRING, "Jane", FLOAT, 11f)),
List expectedEvents =
Arrays.asList(
- new FlushEvent(tableId),
+ NonceUtils.generateFlushEvent(5, 0, tableId, dropColumnEvents.get(0)),
DataChangeEvent.insertEvent(
tableId,
buildRecord(
@@ -1380,7 +1448,7 @@ public void testFineGrainedSchemaEvolves() throws Exception {
SchemaChangeBehavior behavior = SchemaChangeBehavior.EVOLVE;
SchemaOperator schemaOperator =
- new SchemaOperator(new ArrayList<>(), Duration.ofSeconds(30), behavior);
+ new MockedSchemaOperator(new ArrayList<>(), Duration.ofSeconds(30), behavior);
// All types of schema change events will be sent to the sink
// AddColumn and RenameColumn events will always fail
@@ -1420,7 +1488,12 @@ public void testFineGrainedSchemaEvolves() throws Exception {
.collect(Collectors.toList()))
.isEqualTo(
ListUtils.union(
- Collections.singletonList(new FlushEvent(tableId)),
+ Collections.singletonList(
+ NonceUtils.generateFlushEvent(
+ 1,
+ 0,
+ tableId,
+ createAndInsertDataEvents.get(0))),
createAndInsertDataEvents));
Assertions.assertThat(harness.getLatestOriginalSchema(tableId)).isEqualTo(schemaV1);
@@ -1471,7 +1544,9 @@ public void testFineGrainedSchemaEvolves() throws Exception {
processEvent(schemaOperator, addColumnEvents);
List expectedEvents = new ArrayList<>();
- expectedEvents.add(new FlushEvent(tableId));
+ expectedEvents.add(
+ NonceUtils.generateFlushEvent(2, 0, tableId, addColumnEvents.get(0)));
+
expectedEvents.addAll(addColumnEvents);
Assertions.assertThat(
@@ -1533,7 +1608,8 @@ public void testFineGrainedSchemaEvolves() throws Exception {
processEvent(schemaOperator, renameColumnEvents);
List expectedEvents = new ArrayList<>();
- expectedEvents.add(new FlushEvent(tableId));
+ expectedEvents.add(
+ NonceUtils.generateFlushEvent(3, 0, tableId, renameColumnEvents.get(0)));
expectedEvents.addAll(renameColumnEvents);
Assertions.assertThat(
@@ -1578,7 +1654,8 @@ public void testFineGrainedSchemaEvolves() throws Exception {
List expectedEvents =
Arrays.asList(
- new FlushEvent(tableId),
+ NonceUtils.generateFlushEvent(
+ 4, 0, tableId, alterColumnTypeEvents.get(0)),
DataChangeEvent.insertEvent(
tableId,
buildRecord(
@@ -1636,7 +1713,7 @@ tableId, buildRecord(INT, 12, STRING, "Jane", FLOAT, 11f)),
List expectedEvents =
Arrays.asList(
- new FlushEvent(tableId),
+ NonceUtils.generateFlushEvent(5, 0, tableId, dropColumnEvents.get(0)),
DataChangeEvent.insertEvent(
tableId,
buildRecord(
@@ -1692,7 +1769,7 @@ public void testLenientSchemaEvolves() throws Exception {
SchemaChangeBehavior behavior = SchemaChangeBehavior.LENIENT;
SchemaOperator schemaOperator =
- new SchemaOperator(new ArrayList<>(), Duration.ofSeconds(30), behavior);
+ new MockedSchemaOperator(new ArrayList<>(), Duration.ofSeconds(30), behavior);
EventOperatorTestHarness harness =
new EventOperatorTestHarness<>(schemaOperator, 17, Duration.ofSeconds(3), behavior);
harness.open();
@@ -1720,7 +1797,12 @@ public void testLenientSchemaEvolves() throws Exception {
.collect(Collectors.toList()))
.isEqualTo(
ListUtils.union(
- Collections.singletonList(new FlushEvent(tableId)),
+ Collections.singletonList(
+ NonceUtils.generateFlushEvent(
+ 1,
+ 0,
+ tableId,
+ createAndInsertDataEvents.get(0))),
createAndInsertDataEvents));
Assertions.assertThat(harness.getLatestOriginalSchema(tableId)).isEqualTo(schemaV1);
@@ -1776,7 +1858,9 @@ public void testLenientSchemaEvolves() throws Exception {
.collect(Collectors.toList()))
.isEqualTo(
ListUtils.union(
- Collections.singletonList(new FlushEvent(tableId)),
+ Collections.singletonList(
+ NonceUtils.generateFlushEvent(
+ 2, 0, tableId, addColumnEvents.get(0))),
addColumnEvents));
Schema schemaV2 =
@@ -1884,7 +1968,9 @@ public void testLenientSchemaEvolves() throws Exception {
.collect(Collectors.toList()))
.isEqualTo(
ListUtils.union(
- Collections.singletonList(new FlushEvent(tableId)),
+ Collections.singletonList(
+ NonceUtils.generateFlushEvent(
+ 3, 0, tableId, renameColumnEvents.get(0))),
lenientRenameColumnEvents));
Schema schemaV3 =
@@ -1959,7 +2045,9 @@ public void testLenientSchemaEvolves() throws Exception {
.collect(Collectors.toList()))
.isEqualTo(
ListUtils.union(
- Collections.singletonList(new FlushEvent(tableId)),
+ Collections.singletonList(
+ NonceUtils.generateFlushEvent(
+ 4, 0, tableId, alterColumnTypeEvents.get(0))),
lenientAlterColumnTypeEvents));
Schema schemaV4 =
@@ -2018,7 +2106,12 @@ tableId, buildRecord(INT, 12, STRING, "Jane", FLOAT, 11f)),
harness.getOutputRecords().stream()
.map(StreamRecord::getValue)
.collect(Collectors.toList()))
- .isEqualTo(lenientDropColumnEvents);
+ .isEqualTo(
+ ListUtils.union(
+ Collections.singletonList(
+ NonceUtils.generateFlushEvent(
+ 5, 0, tableId, dropColumnEvents.get(0))),
+ lenientDropColumnEvents));
Schema schemaV5 =
Schema.newBuilder()
@@ -2062,7 +2155,7 @@ public void testLenientEvolveTweaks() throws Exception {
SchemaChangeBehavior behavior = SchemaChangeBehavior.LENIENT;
SchemaOperator schemaOperator =
- new SchemaOperator(new ArrayList<>(), Duration.ofSeconds(30), behavior);
+ new MockedSchemaOperator(new ArrayList<>(), Duration.ofSeconds(30), behavior);
EventOperatorTestHarness harness =
new EventOperatorTestHarness<>(schemaOperator, 17, Duration.ofSeconds(3), behavior);
harness.open();
@@ -2100,7 +2193,12 @@ public void testLenientEvolveTweaks() throws Exception {
.collect(Collectors.toList()))
.isEqualTo(
ListUtils.union(
- Collections.singletonList(new FlushEvent(tableId)),
+ Collections.singletonList(
+ NonceUtils.generateFlushEvent(
+ 1,
+ 0,
+ tableId,
+ createAndInsertDataEvents.get(0))),
createAndInsertDataEvents));
Assertions.assertThat(harness.getLatestOriginalSchema(tableId)).isEqualTo(schemaV1);
@@ -2142,7 +2240,9 @@ tableId, buildRecord(INT, 12, INT, 0, SMALLINT, (short) 11)),
.collect(Collectors.toList()))
.isEqualTo(
ListUtils.union(
- Collections.singletonList(new FlushEvent(tableId)),
+ Collections.singletonList(
+ NonceUtils.generateFlushEvent(
+ 2, 0, tableId, dropColumnEvents.get(0))),
lenientDropColumnEvents));
Schema schemaV2 =
@@ -2257,7 +2357,9 @@ tableId, buildRecord(INT, 12, INT, 0, SMALLINT, (short) 11)),
.collect(Collectors.toList()))
.isEqualTo(
ListUtils.union(
- Collections.singletonList(new FlushEvent(tableId)),
+ Collections.singletonList(
+ NonceUtils.generateFlushEvent(
+ 3, 0, tableId, addColumnEvents.get(0))),
lenientAddColumnEvents));
Schema schemaV3 =
@@ -2375,7 +2477,9 @@ tableId, buildRecord(INT, 12, INT, 0, SMALLINT, (short) 11)),
.collect(Collectors.toList()))
.isEqualTo(
ListUtils.union(
- Collections.singletonList(new FlushEvent(tableId)),
+ Collections.singletonList(
+ NonceUtils.generateFlushEvent(
+ 4, 0, tableId, renameColumnEvents.get(0))),
lenientRenameColumnEvents));
Schema schemaV4 =
diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/partitioning/PrePartitionOperatorTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/partitioning/PrePartitionOperatorTest.java
index 38c80914d39..413e180edba 100644
--- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/partitioning/PrePartitionOperatorTest.java
+++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/partitioning/PrePartitionOperatorTest.java
@@ -48,13 +48,15 @@ class PrePartitionOperatorTest {
.build();
private static final int DOWNSTREAM_PARALLELISM = 5;
+ private static final long TEST_NONCE = 123456789L;
+
@Test
void testBroadcastingSchemaChangeEvent() throws Exception {
try (EventOperatorTestHarness testHarness =
createTestHarness()) {
// Initialization
testHarness.open();
- testHarness.registerTableSchema(CUSTOMERS, CUSTOMERS_SCHEMA);
+ testHarness.registerTableSchema(CUSTOMERS, CUSTOMERS_SCHEMA, TEST_NONCE);
// CreateTableEvent
PrePartitionOperator operator = testHarness.getOperator();
@@ -74,11 +76,11 @@ void testBroadcastingFlushEvent() throws Exception {
createTestHarness()) {
// Initialization
testHarness.open();
- testHarness.registerTableSchema(CUSTOMERS, CUSTOMERS_SCHEMA);
+ testHarness.registerTableSchema(CUSTOMERS, CUSTOMERS_SCHEMA, TEST_NONCE);
// FlushEvent
PrePartitionOperator operator = testHarness.getOperator();
- FlushEvent flushEvent = new FlushEvent(CUSTOMERS);
+ FlushEvent flushEvent = new FlushEvent(CUSTOMERS, TEST_NONCE);
operator.processElement(new StreamRecord<>(flushEvent));
assertThat(testHarness.getOutputRecords()).hasSize(DOWNSTREAM_PARALLELISM);
for (int i = 0; i < DOWNSTREAM_PARALLELISM; i++) {
@@ -94,7 +96,7 @@ void testPartitioningDataChangeEvent() throws Exception {
createTestHarness()) {
// Initialization
testHarness.open();
- testHarness.registerTableSchema(CUSTOMERS, CUSTOMERS_SCHEMA);
+ testHarness.registerTableSchema(CUSTOMERS, CUSTOMERS_SCHEMA, 0L);
// DataChangeEvent
PrePartitionOperator operator = testHarness.getOperator();
diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/serializer/event/EventSerializerTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/serializer/event/EventSerializerTest.java
index a34447155fd..eadb12d5d54 100644
--- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/serializer/event/EventSerializerTest.java
+++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/serializer/event/EventSerializerTest.java
@@ -47,9 +47,9 @@ protected Class getTypeClass() {
protected Event[] getTestData() {
Event[] flushEvents =
new Event[] {
- new FlushEvent(TableId.tableId("table")),
- new FlushEvent(TableId.tableId("schema", "table")),
- new FlushEvent(TableId.tableId("namespace", "schema", "table"))
+ new FlushEvent(TableId.tableId("table"), 1L),
+ new FlushEvent(TableId.tableId("schema", "table"), 2L),
+ new FlushEvent(TableId.tableId("namespace", "schema", "table"), 3L)
};
Event[] dataChangeEvents = new DataChangeEventSerializerTest().getTestData();
Event[] schemaChangeEvents = new SchemaChangeEventSerializerTest().getTestData();
diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/serializer/event/PartitioningEventSerializerTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/serializer/event/PartitioningEventSerializerTest.java
index db93ec6831a..6b8a77a12ea 100644
--- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/serializer/event/PartitioningEventSerializerTest.java
+++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/serializer/event/PartitioningEventSerializerTest.java
@@ -51,9 +51,9 @@ protected Class getTypeClass() {
protected PartitioningEvent[] getTestData() {
Event[] flushEvents =
new Event[] {
- new FlushEvent(TableId.tableId("table")),
- new FlushEvent(TableId.tableId("schema", "table")),
- new FlushEvent(TableId.tableId("namespace", "schema", "table"))
+ new FlushEvent(TableId.tableId("table"), 1L),
+ new FlushEvent(TableId.tableId("schema", "table"), 2L),
+ new FlushEvent(TableId.tableId("namespace", "schema", "table"), 3L)
};
Event[] dataChangeEvents = new DataChangeEventSerializerTest().getTestData();
Event[] schemaChangeEvents = new SchemaChangeEventSerializerTest().getTestData();
diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/EventOperatorTestHarness.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/EventOperatorTestHarness.java
index edb64adcd04..c272667c979 100644
--- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/EventOperatorTestHarness.java
+++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/EventOperatorTestHarness.java
@@ -173,9 +173,9 @@ public OP getOperator() {
return operator;
}
- public void registerTableSchema(TableId tableId, Schema schema) {
+ public void registerTableSchema(TableId tableId, Schema schema, long nonce) {
schemaRegistry.handleCoordinationRequest(
- new SchemaChangeRequest(tableId, new CreateTableEvent(tableId, schema), 0));
+ new SchemaChangeRequest(tableId, new CreateTableEvent(tableId, schema), 0, nonce));
schemaRegistry.handleApplyEvolvedSchemaChangeRequest(new CreateTableEvent(tableId, schema));
}
@@ -254,7 +254,10 @@ public void collect(StreamRecord record) {
schemaRegistryGateway.sendOperatorEventToCoordinator(
SINK_OPERATOR_ID,
new SerializedValue<>(
- new FlushSuccessEvent(0, ((FlushEvent) event).getTableId())));
+ new FlushSuccessEvent(
+ 0,
+ ((FlushEvent) event).getTableId(),
+ ((FlushEvent) event).getNonce())));
} catch (IOException e) {
throw new RuntimeException(e);
}