From 3109474ea07d9954b6e3519113874ef046dc35fc Mon Sep 17 00:00:00 2001 From: yuxiqian <34335406+yuxiqian@users.noreply.github.com> Date: Fri, 1 Nov 2024 18:28:00 +0800 Subject: [PATCH 1/2] [FLINK-36690][runtime] Fix schema operator hanging under extreme parallelized pressure Signed-off-by: yuxiqian <34335406+yuxiqian@users.noreply.github.com> --- .../flink/cdc/common/event/FlushEvent.java | 22 +- .../pom.xml | 14 ++ .../MySqlParallelizedPipelineITCase.java | 207 ++++++++++++++++++ .../mysql/testutils/MySqSourceTestUtils.java | 17 ++ .../sink/v2/bucket/BucketAssignOperator.java | 4 +- .../bucket/BucketWrapperEventSerializer.java | 3 +- .../v2/bucket/BucketWrapperFlushEvent.java | 4 +- .../cdc/pipeline/tests/RouteE2eITCase.java | 97 +++++++- .../operators/schema/SchemaOperator.java | 45 ++-- .../schema/coordinator/SchemaRegistry.java | 12 +- .../SchemaRegistryRequestHandler.java | 166 ++++++-------- .../schema/event/FlushSuccessEvent.java | 19 +- .../schema/event/SchemaChangeRequest.java | 17 +- .../schema/event/SchemaChangeResponse.java | 11 +- .../event/SchemaChangeResultRequest.java | 32 +++ .../sink/DataSinkFunctionOperator.java | 2 +- .../sink/DataSinkWriterOperator.java | 2 +- .../operators/sink/SchemaEvolutionClient.java | 5 +- .../serializer/event/EventSerializer.java | 7 +- .../cdc/runtime/typeutils/NonceUtils.java | 52 +++++ .../operators/schema/SchemaEvolveTest.java | 196 +++++++++++++---- .../PrePartitionOperatorTest.java | 10 +- .../serializer/event/EventSerializerTest.java | 6 +- .../PartitioningEventSerializerTest.java | 6 +- .../operators/EventOperatorTestHarness.java | 9 +- 25 files changed, 762 insertions(+), 203 deletions(-) create mode 100644 flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlParallelizedPipelineITCase.java create mode 100644 flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/typeutils/NonceUtils.java diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/FlushEvent.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/FlushEvent.java index 468faa8b5b6..798552e0499 100644 --- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/FlushEvent.java +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/FlushEvent.java @@ -28,14 +28,25 @@ public class FlushEvent implements Event { /** The schema changes from which table. */ private final TableId tableId; - public FlushEvent(TableId tableId) { + /** + * Nonce code to distinguish flush events corresponding to each schema change event from + * different subTasks. + */ + private final long nonce; + + public FlushEvent(TableId tableId, long nonce) { this.tableId = tableId; + this.nonce = nonce; } public TableId getTableId() { return tableId; } + public long getNonce() { + return nonce; + } + @Override public boolean equals(Object o) { if (this == o) { @@ -45,11 +56,16 @@ public boolean equals(Object o) { return false; } FlushEvent that = (FlushEvent) o; - return Objects.equals(tableId, that.tableId); + return Objects.equals(tableId, that.tableId) && Objects.equals(nonce, that.nonce); } @Override public int hashCode() { - return Objects.hash(tableId); + return Objects.hash(tableId, nonce); + } + + @Override + public String toString() { + return "FlushEvent{" + "tableId=" + tableId + ", nonce=" + nonce + '}'; } } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/pom.xml b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/pom.xml index 944175e52a0..155c8240dc5 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/pom.xml +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/pom.xml @@ -46,6 +46,20 @@ limitations under the License. test-jar + + org.apache.flink + flink-cdc-composer + ${project.version} + test + + + + org.apache.flink + flink-cdc-pipeline-connector-values + ${project.version} + test + + org.apache.flink flink-connector-test-util diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlParallelizedPipelineITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlParallelizedPipelineITCase.java new file mode 100644 index 00000000000..4bd9806a580 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlParallelizedPipelineITCase.java @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.mysql.source; + +import org.apache.flink.cdc.common.configuration.Configuration; +import org.apache.flink.cdc.common.pipeline.PipelineOptions; +import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior; +import org.apache.flink.cdc.composer.PipelineExecution; +import org.apache.flink.cdc.composer.definition.PipelineDef; +import org.apache.flink.cdc.composer.definition.SinkDef; +import org.apache.flink.cdc.composer.definition.SourceDef; +import org.apache.flink.cdc.composer.flink.FlinkPipelineComposer; +import org.apache.flink.cdc.connectors.mysql.factory.MySqlDataSourceFactory; +import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase; +import org.apache.flink.cdc.connectors.values.ValuesDatabase; +import org.apache.flink.cdc.connectors.values.factory.ValuesDataFactory; +import org.apache.flink.cdc.connectors.values.sink.ValuesDataSinkOptions; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.Statement; +import java.time.Duration; +import java.util.Collections; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.Stream; + +import static org.apache.flink.cdc.connectors.mysql.testutils.MySqSourceTestUtils.TEST_PASSWORD; +import static org.apache.flink.cdc.connectors.mysql.testutils.MySqSourceTestUtils.TEST_USER; +import static org.apache.flink.cdc.connectors.mysql.testutils.MySqSourceTestUtils.getServerId; +import static org.apache.flink.cdc.connectors.mysql.testutils.MySqSourceTestUtils.loopCheck; +import static org.apache.flink.configuration.CoreOptions.ALWAYS_PARENT_FIRST_LOADER_PATTERNS_ADDITIONAL; +import static org.assertj.core.api.Assertions.assertThat; + +/** Parallelized Integration test for MySQL connector. */ +public class MySqlParallelizedPipelineITCase extends MySqlSourceTestBase { + + private static final int PARALLELISM = 4; + private static final int TEST_TABLE_NUMBER = 100; + + // Always use parent-first classloader for CDC classes. + // The reason is that ValuesDatabase uses static field for holding data, we need to make sure + // the class is loaded by AppClassloader so that we can verify data in the test case. + private static final org.apache.flink.configuration.Configuration MINI_CLUSTER_CONFIG = + new org.apache.flink.configuration.Configuration(); + + static { + MINI_CLUSTER_CONFIG.set( + ALWAYS_PARENT_FIRST_LOADER_PATTERNS_ADDITIONAL, + Collections.singletonList("org.apache.flink.cdc")); + } + + private final PrintStream standardOut = System.out; + private final ByteArrayOutputStream outCaptor = new ByteArrayOutputStream(); + + private final UniqueDatabase parallelismDatabase = + new UniqueDatabase( + MYSQL_CONTAINER, "extreme_parallelism_test_database", TEST_USER, TEST_PASSWORD); + + @Before + public void init() { + // Take over STDOUT as we need to check the output of values sink + System.setOut(new PrintStream(outCaptor)); + // Initialize in-memory database + ValuesDatabase.clear(); + } + + @After + public void cleanup() { + System.setOut(standardOut); + } + + @Test + public void testExtremeParallelizedSchemaChange() throws Exception { + final String databaseName = parallelismDatabase.getDatabaseName(); + try (Connection conn = + DriverManager.getConnection( + MYSQL_CONTAINER.getJdbcUrl(), TEST_USER, TEST_PASSWORD); + Statement stat = conn.createStatement()) { + stat.execute(String.format("CREATE DATABASE %s;", databaseName)); + stat.execute(String.format("USE %s;", databaseName)); + for (int i = 1; i <= TEST_TABLE_NUMBER; i++) { + stat.execute(String.format("DROP TABLE IF EXISTS TABLE%d;", i)); + stat.execute( + String.format( + "CREATE TABLE TABLE%d (ID INT NOT NULL PRIMARY KEY,VERSION VARCHAR(17));", + i)); + stat.execute(String.format("INSERT INTO TABLE%d VALUES (%d, 'No.%d');", i, i, i)); + } + } catch (SQLException e) { + LOG.error("Initialize table failed.", e); + throw e; + } + LOG.info("Table initialized successfully."); + + FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster(); + + // Setup MySQL source + Configuration sourceConfig = new Configuration(); + sourceConfig.set(MySqlDataSourceOptions.HOSTNAME, MYSQL_CONTAINER.getHost()); + sourceConfig.set(MySqlDataSourceOptions.PORT, MYSQL_CONTAINER.getDatabasePort()); + sourceConfig.set(MySqlDataSourceOptions.USERNAME, TEST_USER); + sourceConfig.set(MySqlDataSourceOptions.PASSWORD, TEST_PASSWORD); + sourceConfig.set(MySqlDataSourceOptions.SERVER_TIME_ZONE, "UTC"); + sourceConfig.set(MySqlDataSourceOptions.TABLES, "\\.*.\\.*"); + sourceConfig.set(MySqlDataSourceOptions.SERVER_ID, getServerId(PARALLELISM)); + + SourceDef sourceDef = + new SourceDef(MySqlDataSourceFactory.IDENTIFIER, "MySQL Source", sourceConfig); + + // Setup value sink + Configuration sinkConfig = new Configuration(); + sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true); + SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig); + + // Setup pipeline + Configuration pipelineConfig = new Configuration(); + pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, PARALLELISM); + pipelineConfig.set( + PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE); + PipelineDef pipelineDef = + new PipelineDef( + sourceDef, + sinkDef, + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList(), + pipelineConfig); + + // Execute the pipeline + PipelineExecution execution = composer.compose(pipelineDef); + Thread executeThread = + new Thread( + () -> { + try { + execution.execute(); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + + executeThread.start(); + + try { + loopCheck( + () -> + outCaptor.toString().trim().split("\n").length + >= TEST_TABLE_NUMBER * (PARALLELISM + 1), + "collect enough rows", + Duration.ofSeconds(120), + Duration.ofSeconds(1)); + } finally { + executeThread.interrupt(); + } + + // Check the order and content of all received events + String outputEvents = outCaptor.toString(); + assertThat(outputEvents) + .contains( + IntStream.rangeClosed(1, TEST_TABLE_NUMBER) + .boxed() + .flatMap( + i -> + Stream.concat( + IntStream.range(0, PARALLELISM) + .boxed() + .map( + subTaskId -> + String.format( + "%d> CreateTableEvent{tableId=%s.TABLE%d, schema=columns={`ID` INT NOT NULL,`VERSION` VARCHAR(17)}, primaryKeys=ID, options=()}", + subTaskId, + parallelismDatabase + .getDatabaseName(), + i)), + Stream.of( + String.format( + "> DataChangeEvent{tableId=%s.TABLE%d, before=[], after=[%d, No.%d], op=INSERT, meta=()}", + parallelismDatabase + .getDatabaseName(), + i, + i, + i)))) + .collect(Collectors.toList())); + } +} diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/testutils/MySqSourceTestUtils.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/testutils/MySqSourceTestUtils.java index a76838d664c..fee7ac5b9c0 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/testutils/MySqSourceTestUtils.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/testutils/MySqSourceTestUtils.java @@ -20,10 +20,13 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.cdc.common.event.CreateTableEvent; +import java.time.Duration; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Random; +import java.util.concurrent.TimeoutException; +import java.util.function.Supplier; /** Test utilities for MySQL event source. */ public class MySqSourceTestUtils { @@ -64,4 +67,18 @@ public static String getServerId(int parallelism) { } private MySqSourceTestUtils() {} + + public static void loopCheck( + Supplier runnable, String description, Duration timeout, Duration interval) + throws Exception { + long deadline = System.currentTimeMillis() + timeout.toMillis(); + while (System.currentTimeMillis() < deadline) { + if (runnable.get()) { + return; + } + Thread.sleep(interval.toMillis()); + } + throw new TimeoutException( + "Ran out of time when waiting for " + description + " to success."); + } } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketAssignOperator.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketAssignOperator.java index 358d7c36ce3..3a9801fcdea 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketAssignOperator.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketAssignOperator.java @@ -124,7 +124,9 @@ public void processElement(StreamRecord streamRecord) throws Exception { output.collect( new StreamRecord<>( new BucketWrapperFlushEvent( - currentTaskNumber, ((FlushEvent) event).getTableId()))); + currentTaskNumber, + ((FlushEvent) event).getTableId(), + ((FlushEvent) event).getNonce()))); return; } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketWrapperEventSerializer.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketWrapperEventSerializer.java index f37d9952f98..57803643b55 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketWrapperEventSerializer.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketWrapperEventSerializer.java @@ -82,6 +82,7 @@ public void serialize(Event event, DataOutputView dataOutputView) throws IOExcep BucketWrapperFlushEvent bucketWrapperFlushEvent = (BucketWrapperFlushEvent) event; dataOutputView.writeInt(bucketWrapperFlushEvent.getBucket()); tableIdSerializer.serialize(bucketWrapperFlushEvent.getTableId(), dataOutputView); + dataOutputView.writeLong(bucketWrapperFlushEvent.getNonce()); } } @@ -90,7 +91,7 @@ public Event deserialize(DataInputView source) throws IOException { EventClass eventClass = enumSerializer.deserialize(source); if (eventClass.equals(EventClass.BUCKET_WRAPPER_FLUSH_EVENT)) { return new BucketWrapperFlushEvent( - source.readInt(), tableIdSerializer.deserialize(source)); + source.readInt(), tableIdSerializer.deserialize(source), source.readLong()); } else { return new BucketWrapperChangeEvent( source.readInt(), (ChangeEvent) eventSerializer.deserialize(source)); diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketWrapperFlushEvent.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketWrapperFlushEvent.java index 046a22a318f..5de479e5fa2 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketWrapperFlushEvent.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketWrapperFlushEvent.java @@ -27,8 +27,8 @@ public class BucketWrapperFlushEvent extends FlushEvent implements BucketWrapper private final int bucket; - public BucketWrapperFlushEvent(int bucket, TableId tableId) { - super(tableId); + public BucketWrapperFlushEvent(int bucket, TableId tableId, long nonce) { + super(tableId, nonce); this.bucket = bucket; } diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/RouteE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/RouteE2eITCase.java index c8072cf7d3e..c9b26c7818b 100644 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/RouteE2eITCase.java +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/RouteE2eITCase.java @@ -24,7 +24,6 @@ import org.apache.flink.cdc.pipeline.tests.utils.PipelineTestEnvironment; import org.junit.After; -import org.junit.Assert; import org.junit.Before; import org.junit.ClassRule; import org.junit.Test; @@ -41,6 +40,7 @@ import java.sql.Statement; import java.time.Duration; import java.util.concurrent.TimeoutException; +import java.util.stream.IntStream; /** E2e tests for routing features. */ @RunWith(Parameterized.class) @@ -55,6 +55,7 @@ public class RouteE2eITCase extends PipelineTestEnvironment { protected static final String MYSQL_DRIVER_CLASS = "com.mysql.cj.jdbc.Driver"; protected static final String INTER_CONTAINER_MYSQL_ALIAS = "mysql"; protected static final long EVENT_DEFAULT_TIMEOUT = 60000L; + protected static final int TEST_TABLE_NUMBER = 100; @ClassRule public static final MySqlContainer MYSQL = @@ -73,6 +74,9 @@ public class RouteE2eITCase extends PipelineTestEnvironment { protected final UniqueDatabase routeTestDatabase = new UniqueDatabase(MYSQL, "route_test", MYSQL_TEST_USER, MYSQL_TEST_PASSWORD); + protected final UniqueDatabase extremeRouteTestDatabase = + new UniqueDatabase(MYSQL, "extreme_route_test", MYSQL_TEST_USER, MYSQL_TEST_PASSWORD); + @Before public void before() throws Exception { super.before(); @@ -814,15 +818,98 @@ public void testReplacementSymbol() throws Exception { "DataChangeEvent{tableId=NEW_%s.NEW_TABLEDELTA, before=[], after=[10004], op=INSERT, meta=()}"); } + @Test + public void testExtremeMergeTableRoute() throws Exception { + final String databaseName = extremeRouteTestDatabase.getDatabaseName(); + try (Connection conn = + DriverManager.getConnection( + MYSQL.getJdbcUrl(), MYSQL_TEST_USER, MYSQL_TEST_PASSWORD); + Statement stat = conn.createStatement()) { + stat.execute(String.format("CREATE DATABASE %s;", databaseName)); + stat.execute(String.format("USE %s;", databaseName)); + for (int i = 1; i <= TEST_TABLE_NUMBER; i++) { + stat.execute(String.format("DROP TABLE IF EXISTS TABLE%d;", i)); + stat.execute( + String.format( + "CREATE TABLE TABLE%d (ID INT NOT NULL PRIMARY KEY,VERSION VARCHAR(17));", + i)); + stat.execute(String.format("INSERT INTO TABLE%d VALUES (%d, 'No.%d');", i, i, i)); + } + } catch (SQLException e) { + LOG.error("Initialize table failed.", e); + throw e; + } + LOG.info("Table initialized successfully."); + + String pipelineJob = + String.format( + "source:\n" + + " type: mysql\n" + + " hostname: %s\n" + + " port: 3306\n" + + " username: %s\n" + + " password: %s\n" + + " tables: %s.\\.*\n" + + " server-id: 5400-5404\n" + + " server-time-zone: UTC\n" + + "\n" + + "sink:\n" + + " type: values\n" + + "\n" + + "pipeline:\n" + + " parallelism: %d", + INTER_CONTAINER_MYSQL_ALIAS, + MYSQL_TEST_USER, + MYSQL_TEST_PASSWORD, + databaseName, + parallelism); + Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar"); + Path valuesCdcJar = TestUtils.getResource("values-cdc-pipeline-connector.jar"); + Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar"); + submitPipelineJob(pipelineJob, mysqlCdcJar, valuesCdcJar, mysqlDriverJar); + waitUntilJobRunning(Duration.ofSeconds(30)); + + LOG.info("Verifying CreateTableEvents..."); + validateResult( + 180_000L, + IntStream.rangeClosed(1, TEST_TABLE_NUMBER) + .mapToObj( + i -> + String.format( + "> CreateTableEvent{tableId=%s.TABLE%d, schema=columns={`ID` INT NOT NULL,`VERSION` VARCHAR(17)}, primaryKeys=ID, options=()}", + databaseName, i)) + .toArray(String[]::new)); + + LOG.info("Verifying DataChangeEvents..."); + validateResult( + 180_000L, + IntStream.rangeClosed(1, TEST_TABLE_NUMBER) + .mapToObj( + i -> + String.format( + "> DataChangeEvent{tableId=%s.TABLE%d, before=[], after=[%d, No.%d], op=INSERT, meta=()}", + databaseName, i, i, i)) + .toArray(String[]::new)); + } + private void validateResult(String... expectedEvents) throws Exception { + validateResult(EVENT_DEFAULT_TIMEOUT, expectedEvents); + } + + private void validateResult(long timeout, String... expectedEvents) throws Exception { for (String event : expectedEvents) { - waitUntilSpecificEvent(String.format(event, routeTestDatabase.getDatabaseName())); + waitUntilSpecificEvent( + timeout, String.format(event, routeTestDatabase.getDatabaseName())); } } private void waitUntilSpecificEvent(String event) throws Exception { + waitUntilSpecificEvent(EVENT_DEFAULT_TIMEOUT, event); + } + + private void waitUntilSpecificEvent(long timeout, String event) throws Exception { boolean result = false; - long endTimeout = System.currentTimeMillis() + EVENT_DEFAULT_TIMEOUT; + long endTimeout = System.currentTimeMillis() + timeout; while (System.currentTimeMillis() < endTimeout) { String stdout = taskManagerConsumer.toUtf8String(); if (stdout.contains(event + "\n")) { @@ -839,8 +926,4 @@ private void waitUntilSpecificEvent(String event) throws Exception { + taskManagerConsumer.toUtf8String()); } } - - private void assertNotExists(String event) { - Assert.assertFalse(taskManagerConsumer.toUtf8String().contains(event)); - } } diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java index 6ada032d56f..09bac82faf0 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java @@ -53,6 +53,7 @@ import org.apache.flink.cdc.runtime.operators.schema.metrics.SchemaOperatorMetrics; import org.apache.flink.cdc.runtime.operators.sink.SchemaEvolutionClient; import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator; +import org.apache.flink.cdc.runtime.typeutils.NonceUtils; import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway; import org.apache.flink.runtime.operators.coordination.CoordinationRequest; import org.apache.flink.runtime.operators.coordination.CoordinationResponse; @@ -78,6 +79,7 @@ import java.io.Serializable; import java.math.BigDecimal; import java.time.Duration; +import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneId; import java.util.ArrayList; @@ -430,6 +432,11 @@ private TableId resolveReplacement( return TableId.parse(route.f1); } + @VisibleForTesting + protected int getCurrentTimestamp() { + return (int) Instant.now().getEpochSecond(); + } + private void handleSchemaChangeEvent(TableId tableId, SchemaChangeEvent schemaChangeEvent) throws InterruptedException, TimeoutException { @@ -442,16 +449,21 @@ private void handleSchemaChangeEvent(TableId tableId, SchemaChangeEvent schemaCh schemaChangeEvent)); } + long nonce = + NonceUtils.generateNonce( + getCurrentTimestamp(), subTaskId, tableId, schemaChangeEvent); + + LOG.info("{}> Sending the FlushEvent for table {} (nonce: {}).", subTaskId, tableId, nonce); + output.collect(new StreamRecord<>(new FlushEvent(tableId, nonce))); + // The request will block if another schema change event is being handled - SchemaChangeResponse response = requestSchemaChange(tableId, schemaChangeEvent); + SchemaChangeResponse response = requestSchemaChange(tableId, schemaChangeEvent, nonce); if (response.isAccepted()) { - LOG.info("{}> Sending the FlushEvent for table {}.", subTaskId, tableId); - output.collect(new StreamRecord<>(new FlushEvent(tableId))); List expectedSchemaChangeEvents = response.getSchemaChangeEvents(); schemaOperatorMetrics.increaseSchemaChangeEvents(expectedSchemaChangeEvents.size()); // The request will block until flushing finished in each sink writer - SchemaChangeResultResponse schemaEvolveResponse = requestSchemaChangeResult(); + SchemaChangeResultResponse schemaEvolveResponse = requestSchemaChangeResult(nonce); List finishedSchemaChangeEvents = schemaEvolveResponse.getFinishedSchemaChangeEvents(); @@ -473,37 +485,44 @@ private void handleSchemaChangeEvent(TableId tableId, SchemaChangeEvent schemaCh } private SchemaChangeResponse requestSchemaChange( - TableId tableId, SchemaChangeEvent schemaChangeEvent) + TableId tableId, SchemaChangeEvent schemaChangeEvent, long nonce) throws InterruptedException, TimeoutException { long schemaEvolveTimeOutMillis = System.currentTimeMillis() + rpcTimeOutInMillis; while (true) { SchemaChangeResponse response = sendRequestToCoordinator( - new SchemaChangeRequest(tableId, schemaChangeEvent, subTaskId)); - if (response.isRegistryBusy()) { - if (System.currentTimeMillis() < schemaEvolveTimeOutMillis) { + new SchemaChangeRequest(tableId, schemaChangeEvent, subTaskId, nonce)); + if (System.currentTimeMillis() < schemaEvolveTimeOutMillis) { + if (response.isRegistryBusy()) { LOG.info( "{}> Schema Registry is busy now, waiting for next request...", subTaskId); Thread.sleep(1000); + } else if (response.isWaitingForFlush()) { + LOG.info( + "{}> Schema change event (with once {}) has not collected enough flush success events from writers, waiting...", + subTaskId, + nonce); + Thread.sleep(1000); } else { - throw new TimeoutException("TimeOut when requesting schema change"); + return response; } } else { - return response; + throw new TimeoutException("TimeOut when requesting schema change"); } } } - private SchemaChangeResultResponse requestSchemaChangeResult() + private SchemaChangeResultResponse requestSchemaChangeResult(long nonce) throws InterruptedException, TimeoutException { CoordinationResponse coordinationResponse = - sendRequestToCoordinator(new SchemaChangeResultRequest()); + sendRequestToCoordinator(new SchemaChangeResultRequest(nonce)); long nextRpcTimeOutMillis = System.currentTimeMillis() + rpcTimeOutInMillis; while (coordinationResponse instanceof SchemaChangeProcessingResponse) { if (System.currentTimeMillis() < nextRpcTimeOutMillis) { Thread.sleep(1000); - coordinationResponse = sendRequestToCoordinator(new SchemaChangeResultRequest()); + coordinationResponse = + sendRequestToCoordinator(new SchemaChangeResultRequest(nonce)); } else { throw new TimeoutException("TimeOut when requesting release upstream"); } diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistry.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistry.java index b7223c633b6..9c13b9ed5fe 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistry.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistry.java @@ -180,13 +180,12 @@ public void handleEventFromOperator(int subtask, int attemptNumber, OperatorEven if (event instanceof FlushSuccessEvent) { FlushSuccessEvent flushSuccessEvent = (FlushSuccessEvent) event; LOG.info( - "Sink subtask {} succeed flushing for table {}.", + "Sink subtask {} succeed flushing for table {} (nonce: {}).", flushSuccessEvent.getSubtask(), - flushSuccessEvent.getTableId().toString()); + flushSuccessEvent.getTableId().toString(), + flushSuccessEvent.getNonce()); requestHandler.flushSuccess( - flushSuccessEvent.getTableId(), - flushSuccessEvent.getSubtask(), - currentParallelism); + flushSuccessEvent.getSubtask(), flushSuccessEvent.getNonce()); } else if (event instanceof SinkWriterRegisterEvent) { requestHandler.registerSinkWriter( ((SinkWriterRegisterEvent) event).getSubtask()); @@ -273,7 +272,8 @@ public CompletableFuture handleCoordinationRequest( requestHandler.handleSchemaChangeRequest( schemaChangeRequest, responseFuture); } else if (request instanceof SchemaChangeResultRequest) { - requestHandler.getSchemaChangeResult(responseFuture); + requestHandler.getSchemaChangeResult( + (SchemaChangeResultRequest) request, responseFuture); } else if (request instanceof GetEvolvedSchemaRequest) { handleGetEvolvedSchemaRequest( ((GetEvolvedSchemaRequest) request), responseFuture); diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java index b2a8a822df2..1e4d231b2f7 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java @@ -37,6 +37,7 @@ import org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeProcessingResponse; import org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeRequest; import org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeResponse; +import org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeResultRequest; import org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeResultResponse; import org.apache.flink.runtime.operators.coordination.CoordinationResponse; import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; @@ -80,16 +81,14 @@ public class SchemaRegistryRequestHandler implements Closeable { */ private volatile RequestStatus schemaChangeStatus; - private final List pendingSubTaskIds; private final Object schemaChangeRequestLock; private volatile Throwable currentChangeException; - private volatile List currentDerivedSchemaChangeEvents; private volatile List currentFinishedSchemaChanges; private volatile List currentIgnoredSchemaChanges; /** Sink writers which have sent flush success events for the request. */ - private final Set flushedSinkWriters; + private final ConcurrentHashMap> flushedSinkWriters; /** Executor service to execute schema change. */ private final ExecutorService schemaChangeThreadPool; @@ -98,6 +97,8 @@ public class SchemaRegistryRequestHandler implements Closeable { private final OperatorCoordinator.Context context; + private final int parallelism; + public SchemaRegistryRequestHandler( MetadataApplier metadataApplier, SchemaManager schemaManager, @@ -111,16 +112,18 @@ public SchemaRegistryRequestHandler( this.context = context; this.activeSinkWriters = ConcurrentHashMap.newKeySet(); - this.flushedSinkWriters = ConcurrentHashMap.newKeySet(); + this.flushedSinkWriters = new ConcurrentHashMap<>(); this.schemaChangeThreadPool = Executors.newSingleThreadExecutor(); - this.currentDerivedSchemaChangeEvents = new ArrayList<>(); this.currentFinishedSchemaChanges = new ArrayList<>(); this.currentIgnoredSchemaChanges = new ArrayList<>(); this.schemaChangeStatus = RequestStatus.IDLE; - this.pendingSubTaskIds = new ArrayList<>(); this.schemaChangeRequestLock = new Object(); + + // This check is meant to allow migration test pass since we don't have a valid + // `OperatorCoordinator.Context` in mocked environment. + this.parallelism = context != null ? context.currentParallelism() : 0; } /** @@ -131,53 +134,45 @@ public SchemaRegistryRequestHandler( public void handleSchemaChangeRequest( SchemaChangeRequest request, CompletableFuture response) { - // We use requester subTask ID as the pending ticket, because there will be at most 1 schema - // change requests simultaneously from each subTask - int requestSubTaskId = request.getSubTaskId(); + // We use nonce to identify each schema change request + long nonce = request.getNonce(); synchronized (schemaChangeRequestLock) { // Make sure we handle the first request in the pending list to avoid out-of-order // waiting and blocks checkpointing mechanism. if (schemaChangeStatus == RequestStatus.IDLE) { - if (pendingSubTaskIds.isEmpty()) { - LOG.info( - "Received schema change event request {} from table {} from subTask {}. Pending list is empty, handling this.", - request.getSchemaChangeEvent(), - request.getTableId().toString(), - requestSubTaskId); - } else if (pendingSubTaskIds.get(0) == requestSubTaskId) { - LOG.info( - "Received schema change event request {} from table {} from subTask {}. It is on the first of the pending list, handling this.", - request.getSchemaChangeEvent(), - request.getTableId().toString(), - requestSubTaskId); - pendingSubTaskIds.remove(0); - } else { - LOG.info( - "Received schema change event request {} from table {} from subTask {}. It is not the first of the pending list ({}).", - request.getSchemaChangeEvent(), - request.getTableId().toString(), - requestSubTaskId, - pendingSubTaskIds); - if (!pendingSubTaskIds.contains(requestSubTaskId)) { - pendingSubTaskIds.add(requestSubTaskId); - } - response.complete(wrap(SchemaChangeResponse.busy())); - return; - } - SchemaChangeEvent event = request.getSchemaChangeEvent(); // If this schema change event has been requested by another subTask, ignore it. if (schemaManager.isOriginalSchemaChangeEventRedundant(event)) { LOG.info("Event {} has been addressed before, ignoring it.", event); - clearCurrentSchemaChangeRequest(); + clearCurrentSchemaChangeRequest(nonce); LOG.info( "SchemaChangeStatus switched from WAITING_FOR_FLUSH to IDLE for request {} due to duplicated request.", request); response.complete(wrap(SchemaChangeResponse.duplicate())); return; } + + if (activeSinkWriters.size() < parallelism) { + LOG.info( + "Not all active sink writers have been registered. Current {}, expected {}.", + activeSinkWriters.size(), + parallelism); + response.complete(wrap(SchemaChangeResponse.waitingForFlush())); + return; + } + + if (!activeSinkWriters.equals(flushedSinkWriters.get(nonce))) { + LOG.info( + "Not all active sink writers have completed flush (nonce: {}). Flushed writers: {}, expected: {}.", + nonce, + flushedSinkWriters.get(nonce), + activeSinkWriters); + response.complete(wrap(SchemaChangeResponse.waitingForFlush())); + return; + } + schemaManager.applyOriginalSchemaChange(event); List derivedSchemaChangeEvents = calculateDerivedSchemaChangeEvents(request.getSchemaChangeEvent()); @@ -186,7 +181,7 @@ public void handleSchemaChangeRequest( // route strategies, ignore it. if (derivedSchemaChangeEvents.isEmpty()) { LOG.info("Event {} is omitted from sending to downstream, ignoring it.", event); - clearCurrentSchemaChangeRequest(); + clearCurrentSchemaChangeRequest(nonce); LOG.info( "SchemaChangeStatus switched from WAITING_FOR_FLUSH to IDLE for request {} due to ignored request.", request); @@ -196,9 +191,9 @@ public void handleSchemaChangeRequest( } LOG.info( - "SchemaChangeStatus switched from IDLE to WAITING_FOR_FLUSH, other requests will be blocked."); + "SchemaChangeStatus switched from IDLE to APPLYING, other requests will be blocked."); // This request has been accepted. - schemaChangeStatus = RequestStatus.WAITING_FOR_FLUSH; + schemaChangeStatus = RequestStatus.APPLYING; // Backfill pre-schema info for sink applying derivedSchemaChangeEvents.forEach( @@ -213,18 +208,16 @@ public void handleSchemaChangeRequest( } } }); - currentDerivedSchemaChangeEvents = new ArrayList<>(derivedSchemaChangeEvents); response.complete(wrap(SchemaChangeResponse.accepted(derivedSchemaChangeEvents))); - } else { + LOG.info( - "Schema Registry is busy processing a schema change request, could not handle request {} for now. Added {} to pending list ({}).", - request, - requestSubTaskId, - pendingSubTaskIds); - if (!pendingSubTaskIds.contains(requestSubTaskId)) { - pendingSubTaskIds.add(requestSubTaskId); - } + "All sink subtask have flushed for table {}. Start to apply schema change request {}.", + request.getTableId().toString(), + request); + schemaChangeThreadPool.submit( + () -> applySchemaChange(request.getTableId(), derivedSchemaChangeEvents)); + } else { response.complete(wrap(SchemaChangeResponse.busy())); } } @@ -277,9 +270,7 @@ private void applySchemaChange( "Illegal schemaChangeStatus state: should be APPLYING before applySchemaChange finishes, not " + schemaChangeStatus); schemaChangeStatus = RequestStatus.FINISHED; - LOG.info( - "SchemaChangeStatus switched from APPLYING to FINISHED for request {}.", - currentDerivedSchemaChangeEvents); + LOG.info("SchemaChangeStatus switched from APPLYING to FINISHED."); } /** @@ -295,45 +286,31 @@ public void registerSinkWriter(int sinkSubtask) { /** * Record flushed sink subtasks after receiving FlushSuccessEvent. * - * @param tableId the subtask in SchemaOperator and table that the FlushEvent is about * @param sinkSubtask the sink subtask succeed flushing */ - public void flushSuccess(TableId tableId, int sinkSubtask, int parallelism) { - flushedSinkWriters.add(sinkSubtask); - if (activeSinkWriters.size() < parallelism) { - LOG.info( - "Not all active sink writers have been registered. Current {}, expected {}.", - activeSinkWriters.size(), - parallelism); - return; - } - if (flushedSinkWriters.equals(activeSinkWriters)) { - Preconditions.checkState( - schemaChangeStatus == RequestStatus.WAITING_FOR_FLUSH, - "Illegal schemaChangeStatus state: should be WAITING_FOR_FLUSH before collecting enough FlushEvents, not " - + schemaChangeStatus); - - schemaChangeStatus = RequestStatus.APPLYING; - LOG.info( - "All sink subtask have flushed for table {}. Start to apply schema change.", - tableId.toString()); - schemaChangeThreadPool.submit( - () -> applySchemaChange(tableId, currentDerivedSchemaChangeEvents)); + public void flushSuccess(int sinkSubtask, long nonce) { + synchronized (schemaChangeRequestLock) { + if (!flushedSinkWriters.containsKey(nonce)) { + flushedSinkWriters.put(nonce, ConcurrentHashMap.newKeySet()); + } + flushedSinkWriters.get(nonce).add(sinkSubtask); } } - public void getSchemaChangeResult(CompletableFuture response) { + public void getSchemaChangeResult( + SchemaChangeResultRequest request, CompletableFuture response) { Preconditions.checkState( schemaChangeStatus != RequestStatus.IDLE, "Illegal schemaChangeStatus: should not be IDLE before getting schema change request results."); if (schemaChangeStatus == RequestStatus.FINISHED) { schemaChangeStatus = RequestStatus.IDLE; LOG.info( - "SchemaChangeStatus switched from FINISHED to IDLE for request {}", - currentDerivedSchemaChangeEvents); + "SchemaChangeStatus switched from FINISHED to IDLE. (nonce: {})", + request.getNonce()); // This request has been finished, return it and prepare for the next request - List finishedEvents = clearCurrentSchemaChangeRequest(); + List finishedEvents = + clearCurrentSchemaChangeRequest(request.getNonce()); SchemaChangeResultResponse resultResponse = new SchemaChangeResultResponse(finishedEvents); response.complete(wrap(resultResponse)); @@ -470,15 +447,14 @@ private boolean shouldIgnoreException(Throwable throwable) { && (schemaChangeBehavior == SchemaChangeBehavior.TRY_EVOLVE); } - private List clearCurrentSchemaChangeRequest() { + private List clearCurrentSchemaChangeRequest(long nonce) { if (currentChangeException != null) { context.failJob( new RuntimeException("Failed to apply schema change.", currentChangeException)); } List finishedSchemaChanges = new ArrayList<>(currentFinishedSchemaChanges); - flushedSinkWriters.clear(); - currentDerivedSchemaChangeEvents.clear(); + flushedSinkWriters.remove(nonce); currentFinishedSchemaChanges.clear(); currentIgnoredSchemaChanges.clear(); currentChangeException = null; @@ -487,30 +463,24 @@ private List clearCurrentSchemaChangeRequest() { // Schema change event state could transfer in the following way: // - // -------- B -------- - // | | - // v | - // -------- --------------------- - // | IDLE | --- A --> | WAITING_FOR_FLUSH | - // -------- --------------------- + + // -------- + // | IDLE | -------------------A + // -------- | // ^ | - // E C + // C | // \ v // ------------ ------------ - // | FINISHED | <-- D -- | APPLYING | + // | FINISHED | <-- B -- | APPLYING | // ------------ ------------ // - // A: When a request came to an idling request handler. - // B: When current request is duplicate or ignored by LENIENT / routed table merging - // strategies. - // C: When schema registry collected enough flush success events, and actually started to apply - // schema changes. - // D: When schema change application finishes (successfully or with exceptions) - // E: When current schema change request result has been retrieved by SchemaOperator, and ready - // for the next request. + // A: When a request came to an idling request handler. Only possible when registry is IDLE, + // and it has collected all FlushEvents from sink writers. + // B: When schema change application finishes (successfully or with exceptions) + // C: When current schema change request result has been retrieved by SchemaOperator, and be + // ready for the next request. private enum RequestStatus { IDLE, - WAITING_FOR_FLUSH, APPLYING, FINISHED } diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/FlushSuccessEvent.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/FlushSuccessEvent.java index 72951446456..6959e883b85 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/FlushSuccessEvent.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/FlushSuccessEvent.java @@ -36,9 +36,16 @@ public class FlushSuccessEvent implements OperatorEvent { /** The schema changes from which table is executing it. */ private final TableId tableId; - public FlushSuccessEvent(int subtask, TableId tableId) { + /** + * Nonce code to distinguish flush events corresponding to each schema change event from + * different subTasks. + */ + private final long nonce; + + public FlushSuccessEvent(int subtask, TableId tableId, long nonce) { this.subtask = subtask; this.tableId = tableId; + this.nonce = nonce; } public int getSubtask() { @@ -49,6 +56,10 @@ public TableId getTableId() { return tableId; } + public long getNonce() { + return nonce; + } + @Override public boolean equals(Object o) { if (this == o) { @@ -58,11 +69,13 @@ public boolean equals(Object o) { return false; } FlushSuccessEvent that = (FlushSuccessEvent) o; - return subtask == that.subtask && Objects.equals(tableId, that.tableId); + return subtask == that.subtask + && Objects.equals(tableId, that.tableId) + && nonce == that.nonce; } @Override public int hashCode() { - return Objects.hash(subtask, tableId); + return Objects.hash(subtask, tableId, nonce); } } diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/SchemaChangeRequest.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/SchemaChangeRequest.java index fbc5e9c4037..fba1e505577 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/SchemaChangeRequest.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/SchemaChangeRequest.java @@ -38,12 +38,18 @@ public class SchemaChangeRequest implements CoordinationRequest { private final SchemaChangeEvent schemaChangeEvent; /** The ID of subTask that initiated the request. */ private final int subTaskId; + /** + * Nonce code to distinguish flush events corresponding to each schema change event from + * different subTasks. + */ + private final long nonce; public SchemaChangeRequest( - TableId tableId, SchemaChangeEvent schemaChangeEvent, int subTaskId) { + TableId tableId, SchemaChangeEvent schemaChangeEvent, int subTaskId, long nonce) { this.tableId = tableId; this.schemaChangeEvent = schemaChangeEvent; this.subTaskId = subTaskId; + this.nonce = nonce; } public TableId getTableId() { @@ -58,6 +64,10 @@ public int getSubTaskId() { return subTaskId; } + public long getNonce() { + return nonce; + } + @Override public boolean equals(Object o) { if (this == o) { @@ -69,11 +79,12 @@ public boolean equals(Object o) { SchemaChangeRequest that = (SchemaChangeRequest) o; return Objects.equals(tableId, that.tableId) && Objects.equals(schemaChangeEvent, that.schemaChangeEvent) - && subTaskId == that.subTaskId; + && subTaskId == that.subTaskId + && nonce == that.nonce; } @Override public int hashCode() { - return Objects.hash(tableId, schemaChangeEvent, subTaskId); + return Objects.hash(tableId, schemaChangeEvent, subTaskId, nonce); } } diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/SchemaChangeResponse.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/SchemaChangeResponse.java index 63d57139b10..bea0ce65ce1 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/SchemaChangeResponse.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/SchemaChangeResponse.java @@ -57,6 +57,10 @@ public static SchemaChangeResponse ignored() { return new SchemaChangeResponse(Collections.emptyList(), ResponseCode.IGNORED); } + public static SchemaChangeResponse waitingForFlush() { + return new SchemaChangeResponse(Collections.emptyList(), ResponseCode.WAITING_FOR_FLUSH); + } + private SchemaChangeResponse( List schemaChangeEvents, ResponseCode responseCode) { this.schemaChangeEvents = schemaChangeEvents; @@ -79,6 +83,10 @@ public boolean isIgnored() { return ResponseCode.IGNORED.equals(responseCode); } + public boolean isWaitingForFlush() { + return ResponseCode.WAITING_FOR_FLUSH.equals(responseCode); + } + public List getSchemaChangeEvents() { return schemaChangeEvents; } @@ -129,6 +137,7 @@ public enum ResponseCode { ACCEPTED, BUSY, DUPLICATE, - IGNORED + IGNORED, + WAITING_FOR_FLUSH } } diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/SchemaChangeResultRequest.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/SchemaChangeResultRequest.java index e53762c199f..8160494ce9f 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/SchemaChangeResultRequest.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/SchemaChangeResultRequest.java @@ -28,4 +28,36 @@ public class SchemaChangeResultRequest implements CoordinationRequest { private static final long serialVersionUID = 1L; + + /** + * Nonce code to distinguish flush events corresponding to each schema change event from + * different subTasks. + */ + private final long nonce; + + public SchemaChangeResultRequest(long nonce) { + this.nonce = nonce; + } + + public long getNonce() { + return nonce; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + SchemaChangeResultRequest that = (SchemaChangeResultRequest) o; + return nonce == that.nonce; + } + + @Override + public int hashCode() { + return Long.hashCode(nonce); + } } diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkFunctionOperator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkFunctionOperator.java index 3767ba32123..b7f8f217faa 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkFunctionOperator.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkFunctionOperator.java @@ -113,7 +113,7 @@ public void processElement(StreamRecord element) throws Exception { private void handleFlushEvent(FlushEvent event) throws Exception { userFunction.finish(); schemaEvolutionClient.notifyFlushSuccess( - getRuntimeContext().getIndexOfThisSubtask(), event.getTableId()); + getRuntimeContext().getIndexOfThisSubtask(), event.getTableId(), event.getNonce()); } private void emitLatestSchema(TableId tableId) throws Exception { diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkWriterOperator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkWriterOperator.java index 554d7970272..a3719f91c16 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkWriterOperator.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkWriterOperator.java @@ -199,7 +199,7 @@ public void endInput() throws Exception { private void handleFlushEvent(FlushEvent event) throws Exception { copySinkWriter.flush(false); schemaEvolutionClient.notifyFlushSuccess( - getRuntimeContext().getIndexOfThisSubtask(), event.getTableId()); + getRuntimeContext().getIndexOfThisSubtask(), event.getTableId(), event.getNonce()); } private void emitLatestSchema(TableId tableId) throws Exception { diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/SchemaEvolutionClient.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/SchemaEvolutionClient.java index aa256787f16..0e7df4a2446 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/SchemaEvolutionClient.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/SchemaEvolutionClient.java @@ -60,9 +60,10 @@ public void registerSubtask(int subtask) throws IOException { } /** send {@link FlushSuccessEvent} to {@link SchemaRegistry}. */ - public void notifyFlushSuccess(int subtask, TableId tableId) throws IOException { + public void notifyFlushSuccess(int subtask, TableId tableId, long nonce) throws IOException { toCoordinator.sendOperatorEventToCoordinator( - schemaOperatorID, new SerializedValue<>(new FlushSuccessEvent(subtask, tableId))); + schemaOperatorID, + new SerializedValue<>(new FlushSuccessEvent(subtask, tableId, nonce))); } public Optional getLatestEvolvedSchema(TableId tableId) throws Exception { diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/event/EventSerializer.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/event/EventSerializer.java index c8f34020dfe..2c10ad73c49 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/event/EventSerializer.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/event/EventSerializer.java @@ -61,7 +61,9 @@ public Event createInstance() { @Override public Event copy(Event from) { if (from instanceof FlushEvent) { - return new FlushEvent(tableIdSerializer.copy(((FlushEvent) from).getTableId())); + return new FlushEvent( + tableIdSerializer.copy(((FlushEvent) from).getTableId()), + ((FlushEvent) from).getNonce()); } else if (from instanceof SchemaChangeEvent) { return schemaChangeEventSerializer.copy((SchemaChangeEvent) from); } else if (from instanceof DataChangeEvent) { @@ -85,6 +87,7 @@ public void serialize(Event record, DataOutputView target) throws IOException { if (record instanceof FlushEvent) { enumSerializer.serialize(EventClass.FLUSH_EVENT, target); tableIdSerializer.serialize(((FlushEvent) record).getTableId(), target); + target.writeLong(((FlushEvent) record).getNonce()); } else if (record instanceof SchemaChangeEvent) { enumSerializer.serialize(EventClass.SCHEME_CHANGE_EVENT, target); schemaChangeEventSerializer.serialize((SchemaChangeEvent) record, target); @@ -101,7 +104,7 @@ public Event deserialize(DataInputView source) throws IOException { EventClass eventClass = enumSerializer.deserialize(source); switch (eventClass) { case FLUSH_EVENT: - return new FlushEvent(tableIdSerializer.deserialize(source)); + return new FlushEvent(tableIdSerializer.deserialize(source), source.readLong()); case DATA_CHANGE_EVENT: return dataChangeEventSerializer.deserialize(source); case SCHEME_CHANGE_EVENT: diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/typeutils/NonceUtils.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/typeutils/NonceUtils.java new file mode 100644 index 00000000000..0d9e0b74643 --- /dev/null +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/typeutils/NonceUtils.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.runtime.typeutils; + +import org.apache.flink.cdc.common.annotation.PublicEvolving; +import org.apache.flink.cdc.common.event.Event; +import org.apache.flink.cdc.common.event.FlushEvent; +import org.apache.flink.cdc.common.event.TableId; + +import java.util.Objects; + +/** + * Generates schema evolution nonce value and corresponding {@link FlushEvent}s. It is guaranteed to + * be unique by combining epoch timestamp, subTaskId, Table ID and schema change event into a long + * hashCode. + */ +@PublicEvolving +public class NonceUtils { + + /** + * Generating a nonce value with current @{code timestamp}, {@code subTaskId}, {@code tableId}, + * and {@code schemaChangeEvent}. The higher 32 bits are current UTC timestamp in epoch seconds, + * and the lower 32 bits are Java hashCode of the rest parameters. + */ + public static long generateNonce( + int timestamp, int subTaskId, TableId tableId, Event schemaChangeEvent) { + return (long) timestamp << 32 + | Integer.toUnsignedLong(Objects.hash(subTaskId, tableId, schemaChangeEvent)); + } + + /** Generating a {@link FlushEvent} carrying a nonce. */ + public static FlushEvent generateFlushEvent( + int timestamp, int subTaskId, TableId tableId, Event schemaChangeEvent) { + return new FlushEvent( + tableId, generateNonce(timestamp, subTaskId, tableId, schemaChangeEvent)); + } +} diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/SchemaEvolveTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/SchemaEvolveTest.java index e1f6a94c96d..d60b6d55a9d 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/SchemaEvolveTest.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/SchemaEvolveTest.java @@ -25,13 +25,13 @@ import org.apache.flink.cdc.common.event.DataChangeEvent; import org.apache.flink.cdc.common.event.DropColumnEvent; import org.apache.flink.cdc.common.event.Event; -import org.apache.flink.cdc.common.event.FlushEvent; import org.apache.flink.cdc.common.event.RenameColumnEvent; import org.apache.flink.cdc.common.event.SchemaChangeEventType; import org.apache.flink.cdc.common.event.SchemaChangeEventTypeFamily; import org.apache.flink.cdc.common.event.TableId; import org.apache.flink.cdc.common.exceptions.UnsupportedSchemaChangeEventException; import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior; +import org.apache.flink.cdc.common.route.RouteRule; import org.apache.flink.cdc.common.schema.Column; import org.apache.flink.cdc.common.schema.Schema; import org.apache.flink.cdc.common.types.DataType; @@ -39,6 +39,7 @@ import org.apache.flink.cdc.common.types.RowType; import org.apache.flink.cdc.runtime.testutils.operators.EventOperatorTestHarness; import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator; +import org.apache.flink.cdc.runtime.typeutils.NonceUtils; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap; @@ -69,6 +70,26 @@ public class SchemaEvolveTest { private static final TableId CUSTOMERS_TABLE_ID = TableId.tableId("my_company", "my_branch", "customers"); + /** + * A mocked schema operator that always yields predictable nonce with incrementing timestamp. + */ + public static class MockedSchemaOperator extends SchemaOperator { + + public MockedSchemaOperator( + List routingRules, + Duration rpcTimeOut, + SchemaChangeBehavior schemaChangeBehavior) { + super(routingRules, rpcTimeOut, schemaChangeBehavior); + } + + private int i = 0; + + @Override + protected int getCurrentTimestamp() { + return ++i; + } + } + /** Tests common evolve schema changes without exceptions. */ @Test public void testEvolveSchema() throws Exception { @@ -84,7 +105,7 @@ public void testEvolveSchema() throws Exception { SchemaChangeBehavior behavior = SchemaChangeBehavior.EVOLVE; SchemaOperator schemaOperator = - new SchemaOperator(new ArrayList<>(), Duration.ofSeconds(30), behavior); + new MockedSchemaOperator(new ArrayList<>(), Duration.ofSeconds(30), behavior); EventOperatorTestHarness harness = new EventOperatorTestHarness<>(schemaOperator, 17, Duration.ofSeconds(3), behavior); harness.open(); @@ -108,7 +129,12 @@ public void testEvolveSchema() throws Exception { Assertions.assertThat( ListUtils.union( - Collections.singletonList(new FlushEvent(tableId)), + Collections.singletonList( + NonceUtils.generateFlushEvent( + 1, + 0, + tableId, + createAndInsertDataEvents.get(0))), createAndInsertDataEvents)) .isEqualTo( harness.getOutputRecords().stream() @@ -168,7 +194,9 @@ public void testEvolveSchema() throws Exception { .collect(Collectors.toList())) .isEqualTo( ListUtils.union( - Collections.singletonList(new FlushEvent(tableId)), + Collections.singletonList( + NonceUtils.generateFlushEvent( + 2, 0, tableId, addColumnEvents.get(0))), addColumnEvents)); Schema schemaV2 = @@ -227,7 +255,9 @@ public void testEvolveSchema() throws Exception { .collect(Collectors.toList())) .isEqualTo( ListUtils.union( - Collections.singletonList(new FlushEvent(tableId)), + Collections.singletonList( + NonceUtils.generateFlushEvent( + 3, 0, tableId, renameColumnEvents.get(0))), renameColumnEvents)); Schema schemaV3 = @@ -272,7 +302,9 @@ public void testEvolveSchema() throws Exception { .collect(Collectors.toList())) .isEqualTo( ListUtils.union( - Collections.singletonList(new FlushEvent(tableId)), + Collections.singletonList( + NonceUtils.generateFlushEvent( + 4, 0, tableId, alterColumnTypeEvents.get(0))), alterColumnTypeEvents)); Schema schemaV4 = @@ -308,7 +340,9 @@ tableId, buildRecord(INT, 12, STRING, "Jane", FLOAT, 11f)), .collect(Collectors.toList())) .isEqualTo( ListUtils.union( - Collections.singletonList(new FlushEvent(tableId)), + Collections.singletonList( + NonceUtils.generateFlushEvent( + 5, 0, tableId, dropColumnEvents.get(0))), dropColumnEvents)); Schema schemaV5 = @@ -341,7 +375,7 @@ public void testTryEvolveSchema() throws Exception { SchemaChangeBehavior behavior = SchemaChangeBehavior.TRY_EVOLVE; SchemaOperator schemaOperator = - new SchemaOperator(new ArrayList<>(), Duration.ofSeconds(30), behavior); + new MockedSchemaOperator(new ArrayList<>(), Duration.ofSeconds(30), behavior); EventOperatorTestHarness harness = new EventOperatorTestHarness<>(schemaOperator, 17, Duration.ofSeconds(3), behavior); harness.open(); @@ -369,7 +403,12 @@ public void testTryEvolveSchema() throws Exception { .collect(Collectors.toList())) .isEqualTo( ListUtils.union( - Collections.singletonList(new FlushEvent(tableId)), + Collections.singletonList( + NonceUtils.generateFlushEvent( + 1, + 0, + tableId, + createAndInsertDataEvents.get(0))), createAndInsertDataEvents)); Assertions.assertThat(harness.getLatestOriginalSchema(tableId)).isEqualTo(schemaV1); @@ -425,7 +464,9 @@ public void testTryEvolveSchema() throws Exception { .collect(Collectors.toList())) .isEqualTo( ListUtils.union( - Collections.singletonList(new FlushEvent(tableId)), + Collections.singletonList( + NonceUtils.generateFlushEvent( + 2, 0, tableId, addColumnEvents.get(0))), addColumnEvents)); Schema schemaV2 = @@ -484,7 +525,9 @@ public void testTryEvolveSchema() throws Exception { .collect(Collectors.toList())) .isEqualTo( ListUtils.union( - Collections.singletonList(new FlushEvent(tableId)), + Collections.singletonList( + NonceUtils.generateFlushEvent( + 3, 0, tableId, renameColumnEvents.get(0))), renameColumnEvents)); Schema schemaV3 = @@ -529,7 +572,9 @@ public void testTryEvolveSchema() throws Exception { .collect(Collectors.toList())) .isEqualTo( ListUtils.union( - Collections.singletonList(new FlushEvent(tableId)), + Collections.singletonList( + NonceUtils.generateFlushEvent( + 4, 0, tableId, alterColumnTypeEvents.get(0))), alterColumnTypeEvents)); Schema schemaV4 = @@ -565,7 +610,9 @@ tableId, buildRecord(INT, 12, STRING, "Jane", FLOAT, 11f)), .collect(Collectors.toList())) .isEqualTo( ListUtils.union( - Collections.singletonList(new FlushEvent(tableId)), + Collections.singletonList( + NonceUtils.generateFlushEvent( + 5, 0, tableId, dropColumnEvents.get(0))), dropColumnEvents)); Schema schemaV5 = @@ -598,7 +645,7 @@ public void testExceptionEvolveSchema() throws Exception { SchemaChangeBehavior behavior = SchemaChangeBehavior.EXCEPTION; SchemaOperator schemaOperator = - new SchemaOperator(new ArrayList<>(), Duration.ofSeconds(30), behavior); + new MockedSchemaOperator(new ArrayList<>(), Duration.ofSeconds(30), behavior); EventOperatorTestHarness harness = new EventOperatorTestHarness<>(schemaOperator, 17, Duration.ofSeconds(3), behavior); harness.open(); @@ -626,7 +673,12 @@ public void testExceptionEvolveSchema() throws Exception { .collect(Collectors.toList())) .isEqualTo( ListUtils.union( - Collections.singletonList(new FlushEvent(tableId)), + Collections.singletonList( + NonceUtils.generateFlushEvent( + 1, + 0, + tableId, + createAndInsertDataEvents.get(0))), createAndInsertDataEvents)); Assertions.assertThat(harness.getLatestOriginalSchema(tableId)).isEqualTo(schemaV1); @@ -707,7 +759,7 @@ public void testIgnoreEvolveSchema() throws Exception { SchemaChangeBehavior behavior = SchemaChangeBehavior.IGNORE; SchemaOperator schemaOperator = - new SchemaOperator(new ArrayList<>(), Duration.ofSeconds(30), behavior); + new MockedSchemaOperator(new ArrayList<>(), Duration.ofSeconds(30), behavior); EventOperatorTestHarness harness = new EventOperatorTestHarness<>(schemaOperator, 17, Duration.ofSeconds(3), behavior); harness.open(); @@ -735,7 +787,12 @@ public void testIgnoreEvolveSchema() throws Exception { .collect(Collectors.toList())) .isEqualTo( ListUtils.union( - Collections.singletonList(new FlushEvent(tableId)), + Collections.singletonList( + NonceUtils.generateFlushEvent( + 1, + 0, + tableId, + createAndInsertDataEvents.get(0))), createAndInsertDataEvents)); Assertions.assertThat(harness.getLatestOriginalSchema(tableId)).isEqualTo(schemaV1); @@ -787,7 +844,7 @@ public void testIgnoreEvolveSchema() throws Exception { List expectedEvents = Arrays.asList( - new FlushEvent(tableId), + NonceUtils.generateFlushEvent(2, 0, tableId, addColumnEvents.get(0)), DataChangeEvent.insertEvent( tableId, buildRecord(INT, 4, STRING, "Derrida", SMALLINT, (short) 20)), @@ -855,7 +912,7 @@ public void testIgnoreEvolveSchema() throws Exception { List expectedEvents = Arrays.asList( - new FlushEvent(tableId), + NonceUtils.generateFlushEvent(3, 0, tableId, renameColumnEvents.get(0)), DataChangeEvent.insertEvent( tableId, buildRecord(INT, 6, STRING, null, SMALLINT, (short) 22)), @@ -904,7 +961,8 @@ public void testIgnoreEvolveSchema() throws Exception { List expectedEvents = Arrays.asList( - new FlushEvent(tableId), + NonceUtils.generateFlushEvent( + 4, 0, tableId, alterColumnTypeEvents.get(0)), DataChangeEvent.insertEvent( tableId, buildRecord(INT, 8, STRING, null, SMALLINT, null)), DataChangeEvent.insertEvent( @@ -945,7 +1003,7 @@ tableId, buildRecord(INT, 12, STRING, "Jane", FLOAT, 11f)), List expectedEvents = Arrays.asList( - new FlushEvent(tableId), + NonceUtils.generateFlushEvent(5, 0, tableId, dropColumnEvents.get(0)), DataChangeEvent.insertEvent( tableId, buildRecord(INT, 12, STRING, null, DOUBLE, null)), DataChangeEvent.insertEvent( @@ -986,7 +1044,7 @@ public void testEvolveSchemaWithFailure() throws Exception { SchemaChangeBehavior behavior = SchemaChangeBehavior.EVOLVE; SchemaOperator schemaOperator = - new SchemaOperator(new ArrayList<>(), Duration.ofSeconds(30), behavior); + new MockedSchemaOperator(new ArrayList<>(), Duration.ofSeconds(30), behavior); EventOperatorTestHarness harness = new EventOperatorTestHarness<>( schemaOperator, @@ -1021,7 +1079,9 @@ tableId, buildRecord(INT, 2, STRING, "Bob", SMALLINT, (short) 18)), .collect(Collectors.toList())) .isEqualTo( ListUtils.union( - Collections.singletonList(new FlushEvent(tableId)), + Collections.singletonList( + NonceUtils.generateFlushEvent( + 1, 0, tableId, createAndInsertDataEvents.get(0))), createAndInsertDataEvents)); Assertions.assertThat(harness.getLatestOriginalSchema(tableId)).isEqualTo(schemaV1); @@ -1068,7 +1128,7 @@ public void testTryEvolveSchemaWithFailure() throws Exception { SchemaChangeBehavior behavior = SchemaChangeBehavior.TRY_EVOLVE; SchemaOperator schemaOperator = - new SchemaOperator(new ArrayList<>(), Duration.ofSeconds(30), behavior); + new MockedSchemaOperator(new ArrayList<>(), Duration.ofSeconds(30), behavior); // All types of schema change events will be sent to the sink // AddColumn and RenameColumn events will always fail @@ -1108,7 +1168,12 @@ public void testTryEvolveSchemaWithFailure() throws Exception { .collect(Collectors.toList())) .isEqualTo( ListUtils.union( - Collections.singletonList(new FlushEvent(tableId)), + Collections.singletonList( + NonceUtils.generateFlushEvent( + 1, + 0, + tableId, + createAndInsertDataEvents.get(0))), createAndInsertDataEvents)); Assertions.assertThat(harness.getLatestOriginalSchema(tableId)).isEqualTo(schemaV1); @@ -1159,7 +1224,8 @@ public void testTryEvolveSchemaWithFailure() throws Exception { processEvent(schemaOperator, addColumnEvents); List expectedEvents = new ArrayList<>(); - expectedEvents.add(new FlushEvent(tableId)); + expectedEvents.add( + NonceUtils.generateFlushEvent(2, 0, tableId, addColumnEvents.get(0))); expectedEvents.addAll(addColumnEvents); Assertions.assertThat( @@ -1221,7 +1287,8 @@ public void testTryEvolveSchemaWithFailure() throws Exception { processEvent(schemaOperator, renameColumnEvents); List expectedEvents = new ArrayList<>(); - expectedEvents.add(new FlushEvent(tableId)); + expectedEvents.add( + NonceUtils.generateFlushEvent(3, 0, tableId, renameColumnEvents.get(0))); expectedEvents.addAll(renameColumnEvents); Assertions.assertThat( @@ -1266,7 +1333,8 @@ public void testTryEvolveSchemaWithFailure() throws Exception { List expectedEvents = Arrays.asList( - new FlushEvent(tableId), + NonceUtils.generateFlushEvent( + 4, 0, tableId, alterColumnTypeEvents.get(0)), DataChangeEvent.insertEvent( tableId, buildRecord( @@ -1324,7 +1392,7 @@ tableId, buildRecord(INT, 12, STRING, "Jane", FLOAT, 11f)), List expectedEvents = Arrays.asList( - new FlushEvent(tableId), + NonceUtils.generateFlushEvent(5, 0, tableId, dropColumnEvents.get(0)), DataChangeEvent.insertEvent( tableId, buildRecord( @@ -1380,7 +1448,7 @@ public void testFineGrainedSchemaEvolves() throws Exception { SchemaChangeBehavior behavior = SchemaChangeBehavior.EVOLVE; SchemaOperator schemaOperator = - new SchemaOperator(new ArrayList<>(), Duration.ofSeconds(30), behavior); + new MockedSchemaOperator(new ArrayList<>(), Duration.ofSeconds(30), behavior); // All types of schema change events will be sent to the sink // AddColumn and RenameColumn events will always fail @@ -1420,7 +1488,12 @@ public void testFineGrainedSchemaEvolves() throws Exception { .collect(Collectors.toList())) .isEqualTo( ListUtils.union( - Collections.singletonList(new FlushEvent(tableId)), + Collections.singletonList( + NonceUtils.generateFlushEvent( + 1, + 0, + tableId, + createAndInsertDataEvents.get(0))), createAndInsertDataEvents)); Assertions.assertThat(harness.getLatestOriginalSchema(tableId)).isEqualTo(schemaV1); @@ -1471,7 +1544,9 @@ public void testFineGrainedSchemaEvolves() throws Exception { processEvent(schemaOperator, addColumnEvents); List expectedEvents = new ArrayList<>(); - expectedEvents.add(new FlushEvent(tableId)); + expectedEvents.add( + NonceUtils.generateFlushEvent(2, 0, tableId, addColumnEvents.get(0))); + expectedEvents.addAll(addColumnEvents); Assertions.assertThat( @@ -1533,7 +1608,8 @@ public void testFineGrainedSchemaEvolves() throws Exception { processEvent(schemaOperator, renameColumnEvents); List expectedEvents = new ArrayList<>(); - expectedEvents.add(new FlushEvent(tableId)); + expectedEvents.add( + NonceUtils.generateFlushEvent(3, 0, tableId, renameColumnEvents.get(0))); expectedEvents.addAll(renameColumnEvents); Assertions.assertThat( @@ -1578,7 +1654,8 @@ public void testFineGrainedSchemaEvolves() throws Exception { List expectedEvents = Arrays.asList( - new FlushEvent(tableId), + NonceUtils.generateFlushEvent( + 4, 0, tableId, alterColumnTypeEvents.get(0)), DataChangeEvent.insertEvent( tableId, buildRecord( @@ -1636,7 +1713,7 @@ tableId, buildRecord(INT, 12, STRING, "Jane", FLOAT, 11f)), List expectedEvents = Arrays.asList( - new FlushEvent(tableId), + NonceUtils.generateFlushEvent(5, 0, tableId, dropColumnEvents.get(0)), DataChangeEvent.insertEvent( tableId, buildRecord( @@ -1692,7 +1769,7 @@ public void testLenientSchemaEvolves() throws Exception { SchemaChangeBehavior behavior = SchemaChangeBehavior.LENIENT; SchemaOperator schemaOperator = - new SchemaOperator(new ArrayList<>(), Duration.ofSeconds(30), behavior); + new MockedSchemaOperator(new ArrayList<>(), Duration.ofSeconds(30), behavior); EventOperatorTestHarness harness = new EventOperatorTestHarness<>(schemaOperator, 17, Duration.ofSeconds(3), behavior); harness.open(); @@ -1720,7 +1797,12 @@ public void testLenientSchemaEvolves() throws Exception { .collect(Collectors.toList())) .isEqualTo( ListUtils.union( - Collections.singletonList(new FlushEvent(tableId)), + Collections.singletonList( + NonceUtils.generateFlushEvent( + 1, + 0, + tableId, + createAndInsertDataEvents.get(0))), createAndInsertDataEvents)); Assertions.assertThat(harness.getLatestOriginalSchema(tableId)).isEqualTo(schemaV1); @@ -1776,7 +1858,9 @@ public void testLenientSchemaEvolves() throws Exception { .collect(Collectors.toList())) .isEqualTo( ListUtils.union( - Collections.singletonList(new FlushEvent(tableId)), + Collections.singletonList( + NonceUtils.generateFlushEvent( + 2, 0, tableId, addColumnEvents.get(0))), addColumnEvents)); Schema schemaV2 = @@ -1884,7 +1968,9 @@ public void testLenientSchemaEvolves() throws Exception { .collect(Collectors.toList())) .isEqualTo( ListUtils.union( - Collections.singletonList(new FlushEvent(tableId)), + Collections.singletonList( + NonceUtils.generateFlushEvent( + 3, 0, tableId, renameColumnEvents.get(0))), lenientRenameColumnEvents)); Schema schemaV3 = @@ -1959,7 +2045,9 @@ public void testLenientSchemaEvolves() throws Exception { .collect(Collectors.toList())) .isEqualTo( ListUtils.union( - Collections.singletonList(new FlushEvent(tableId)), + Collections.singletonList( + NonceUtils.generateFlushEvent( + 4, 0, tableId, alterColumnTypeEvents.get(0))), lenientAlterColumnTypeEvents)); Schema schemaV4 = @@ -2018,7 +2106,12 @@ tableId, buildRecord(INT, 12, STRING, "Jane", FLOAT, 11f)), harness.getOutputRecords().stream() .map(StreamRecord::getValue) .collect(Collectors.toList())) - .isEqualTo(lenientDropColumnEvents); + .isEqualTo( + ListUtils.union( + Collections.singletonList( + NonceUtils.generateFlushEvent( + 5, 0, tableId, dropColumnEvents.get(0))), + lenientDropColumnEvents)); Schema schemaV5 = Schema.newBuilder() @@ -2062,7 +2155,7 @@ public void testLenientEvolveTweaks() throws Exception { SchemaChangeBehavior behavior = SchemaChangeBehavior.LENIENT; SchemaOperator schemaOperator = - new SchemaOperator(new ArrayList<>(), Duration.ofSeconds(30), behavior); + new MockedSchemaOperator(new ArrayList<>(), Duration.ofSeconds(30), behavior); EventOperatorTestHarness harness = new EventOperatorTestHarness<>(schemaOperator, 17, Duration.ofSeconds(3), behavior); harness.open(); @@ -2100,7 +2193,12 @@ public void testLenientEvolveTweaks() throws Exception { .collect(Collectors.toList())) .isEqualTo( ListUtils.union( - Collections.singletonList(new FlushEvent(tableId)), + Collections.singletonList( + NonceUtils.generateFlushEvent( + 1, + 0, + tableId, + createAndInsertDataEvents.get(0))), createAndInsertDataEvents)); Assertions.assertThat(harness.getLatestOriginalSchema(tableId)).isEqualTo(schemaV1); @@ -2142,7 +2240,9 @@ tableId, buildRecord(INT, 12, INT, 0, SMALLINT, (short) 11)), .collect(Collectors.toList())) .isEqualTo( ListUtils.union( - Collections.singletonList(new FlushEvent(tableId)), + Collections.singletonList( + NonceUtils.generateFlushEvent( + 2, 0, tableId, dropColumnEvents.get(0))), lenientDropColumnEvents)); Schema schemaV2 = @@ -2257,7 +2357,9 @@ tableId, buildRecord(INT, 12, INT, 0, SMALLINT, (short) 11)), .collect(Collectors.toList())) .isEqualTo( ListUtils.union( - Collections.singletonList(new FlushEvent(tableId)), + Collections.singletonList( + NonceUtils.generateFlushEvent( + 3, 0, tableId, addColumnEvents.get(0))), lenientAddColumnEvents)); Schema schemaV3 = @@ -2375,7 +2477,9 @@ tableId, buildRecord(INT, 12, INT, 0, SMALLINT, (short) 11)), .collect(Collectors.toList())) .isEqualTo( ListUtils.union( - Collections.singletonList(new FlushEvent(tableId)), + Collections.singletonList( + NonceUtils.generateFlushEvent( + 4, 0, tableId, renameColumnEvents.get(0))), lenientRenameColumnEvents)); Schema schemaV4 = diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/partitioning/PrePartitionOperatorTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/partitioning/PrePartitionOperatorTest.java index 38c80914d39..413e180edba 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/partitioning/PrePartitionOperatorTest.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/partitioning/PrePartitionOperatorTest.java @@ -48,13 +48,15 @@ class PrePartitionOperatorTest { .build(); private static final int DOWNSTREAM_PARALLELISM = 5; + private static final long TEST_NONCE = 123456789L; + @Test void testBroadcastingSchemaChangeEvent() throws Exception { try (EventOperatorTestHarness testHarness = createTestHarness()) { // Initialization testHarness.open(); - testHarness.registerTableSchema(CUSTOMERS, CUSTOMERS_SCHEMA); + testHarness.registerTableSchema(CUSTOMERS, CUSTOMERS_SCHEMA, TEST_NONCE); // CreateTableEvent PrePartitionOperator operator = testHarness.getOperator(); @@ -74,11 +76,11 @@ void testBroadcastingFlushEvent() throws Exception { createTestHarness()) { // Initialization testHarness.open(); - testHarness.registerTableSchema(CUSTOMERS, CUSTOMERS_SCHEMA); + testHarness.registerTableSchema(CUSTOMERS, CUSTOMERS_SCHEMA, TEST_NONCE); // FlushEvent PrePartitionOperator operator = testHarness.getOperator(); - FlushEvent flushEvent = new FlushEvent(CUSTOMERS); + FlushEvent flushEvent = new FlushEvent(CUSTOMERS, TEST_NONCE); operator.processElement(new StreamRecord<>(flushEvent)); assertThat(testHarness.getOutputRecords()).hasSize(DOWNSTREAM_PARALLELISM); for (int i = 0; i < DOWNSTREAM_PARALLELISM; i++) { @@ -94,7 +96,7 @@ void testPartitioningDataChangeEvent() throws Exception { createTestHarness()) { // Initialization testHarness.open(); - testHarness.registerTableSchema(CUSTOMERS, CUSTOMERS_SCHEMA); + testHarness.registerTableSchema(CUSTOMERS, CUSTOMERS_SCHEMA, 0L); // DataChangeEvent PrePartitionOperator operator = testHarness.getOperator(); diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/serializer/event/EventSerializerTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/serializer/event/EventSerializerTest.java index a34447155fd..eadb12d5d54 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/serializer/event/EventSerializerTest.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/serializer/event/EventSerializerTest.java @@ -47,9 +47,9 @@ protected Class getTypeClass() { protected Event[] getTestData() { Event[] flushEvents = new Event[] { - new FlushEvent(TableId.tableId("table")), - new FlushEvent(TableId.tableId("schema", "table")), - new FlushEvent(TableId.tableId("namespace", "schema", "table")) + new FlushEvent(TableId.tableId("table"), 1L), + new FlushEvent(TableId.tableId("schema", "table"), 2L), + new FlushEvent(TableId.tableId("namespace", "schema", "table"), 3L) }; Event[] dataChangeEvents = new DataChangeEventSerializerTest().getTestData(); Event[] schemaChangeEvents = new SchemaChangeEventSerializerTest().getTestData(); diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/serializer/event/PartitioningEventSerializerTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/serializer/event/PartitioningEventSerializerTest.java index db93ec6831a..6b8a77a12ea 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/serializer/event/PartitioningEventSerializerTest.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/serializer/event/PartitioningEventSerializerTest.java @@ -51,9 +51,9 @@ protected Class getTypeClass() { protected PartitioningEvent[] getTestData() { Event[] flushEvents = new Event[] { - new FlushEvent(TableId.tableId("table")), - new FlushEvent(TableId.tableId("schema", "table")), - new FlushEvent(TableId.tableId("namespace", "schema", "table")) + new FlushEvent(TableId.tableId("table"), 1L), + new FlushEvent(TableId.tableId("schema", "table"), 2L), + new FlushEvent(TableId.tableId("namespace", "schema", "table"), 3L) }; Event[] dataChangeEvents = new DataChangeEventSerializerTest().getTestData(); Event[] schemaChangeEvents = new SchemaChangeEventSerializerTest().getTestData(); diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/EventOperatorTestHarness.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/EventOperatorTestHarness.java index edb64adcd04..c272667c979 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/EventOperatorTestHarness.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/EventOperatorTestHarness.java @@ -173,9 +173,9 @@ public OP getOperator() { return operator; } - public void registerTableSchema(TableId tableId, Schema schema) { + public void registerTableSchema(TableId tableId, Schema schema, long nonce) { schemaRegistry.handleCoordinationRequest( - new SchemaChangeRequest(tableId, new CreateTableEvent(tableId, schema), 0)); + new SchemaChangeRequest(tableId, new CreateTableEvent(tableId, schema), 0, nonce)); schemaRegistry.handleApplyEvolvedSchemaChangeRequest(new CreateTableEvent(tableId, schema)); } @@ -254,7 +254,10 @@ public void collect(StreamRecord record) { schemaRegistryGateway.sendOperatorEventToCoordinator( SINK_OPERATOR_ID, new SerializedValue<>( - new FlushSuccessEvent(0, ((FlushEvent) event).getTableId()))); + new FlushSuccessEvent( + 0, + ((FlushEvent) event).getTableId(), + ((FlushEvent) event).getNonce()))); } catch (IOException e) { throw new RuntimeException(e); } From 2870254f03e537d3925b1b8cee41471e725189be Mon Sep 17 00:00:00 2001 From: yuxiqian <34335406+yuxiqian@users.noreply.github.com> Date: Fri, 13 Dec 2024 17:36:38 +0800 Subject: [PATCH 2/2] Address comments Signed-off-by: yuxiqian <34335406+yuxiqian@users.noreply.github.com> --- .../cdc/connectors/mysql/testutils/MySqSourceTestUtils.java | 4 ++-- .../flink/cdc/runtime/operators/schema/SchemaOperator.java | 2 +- .../schema/coordinator/SchemaRegistryRequestHandler.java | 4 ++-- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/testutils/MySqSourceTestUtils.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/testutils/MySqSourceTestUtils.java index fee7ac5b9c0..d193341eacc 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/testutils/MySqSourceTestUtils.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/testutils/MySqSourceTestUtils.java @@ -66,8 +66,6 @@ public static String getServerId(int parallelism) { return serverId + "-" + (serverId + parallelism); } - private MySqSourceTestUtils() {} - public static void loopCheck( Supplier runnable, String description, Duration timeout, Duration interval) throws Exception { @@ -81,4 +79,6 @@ public static void loopCheck( throw new TimeoutException( "Ran out of time when waiting for " + description + " to success."); } + + private MySqSourceTestUtils() {} } diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java index 09bac82faf0..179a71e2dfa 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java @@ -500,7 +500,7 @@ private SchemaChangeResponse requestSchemaChange( Thread.sleep(1000); } else if (response.isWaitingForFlush()) { LOG.info( - "{}> Schema change event (with once {}) has not collected enough flush success events from writers, waiting...", + "{}> Schema change event (with nonce {}) has not collected enough flush success events from writers, waiting...", subTaskId, nonce); Thread.sleep(1000); diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java index 1e4d231b2f7..2faba42caef 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java @@ -148,7 +148,7 @@ public void handleSchemaChangeRequest( LOG.info("Event {} has been addressed before, ignoring it.", event); clearCurrentSchemaChangeRequest(nonce); LOG.info( - "SchemaChangeStatus switched from WAITING_FOR_FLUSH to IDLE for request {} due to duplicated request.", + "SchemaChangeStatus is still IDLE for request {} due to duplicated request.", request); response.complete(wrap(SchemaChangeResponse.duplicate())); return; @@ -183,7 +183,7 @@ public void handleSchemaChangeRequest( LOG.info("Event {} is omitted from sending to downstream, ignoring it.", event); clearCurrentSchemaChangeRequest(nonce); LOG.info( - "SchemaChangeStatus switched from WAITING_FOR_FLUSH to IDLE for request {} due to ignored request.", + "SchemaChangeStatus is still IDLE for request {} due to ignored request.", request); response.complete(wrap(SchemaChangeResponse.ignored()));