From 69ca25247ae194456c25404797c916e6a2aea61d Mon Sep 17 00:00:00 2001 From: yuxiqian <34335406+yuxiqian@users.noreply.github.com> Date: Thu, 22 Aug 2024 15:32:43 +0800 Subject: [PATCH 1/2] [FLINK-36128] Promote `LENIENT` as the default schema change behavior --- .../parser/YamlPipelineDefinitionParser.java | 48 +++++++++++++++---- .../YamlPipelineDefinitionParserTest.java | 36 ++++++++++++-- .../common/event/SchemaChangeEventType.java | 24 +++++++--- .../cdc/common/pipeline/PipelineOptions.java | 2 +- .../cdc/common/utils/ChangeEventUtils.java | 8 +--- .../common/utils/ChangeEventUtilsTest.java | 18 +++++-- .../flink/FlinkPipelineComposerITCase.java | 21 ++++++++ .../flink/FlinkPipelineTransformITCase.java | 13 +++++ .../flink/FlinkPipelineUdfITCase.java | 21 ++++++++ .../cdc/pipeline/tests/MysqlE2eITCase.java | 3 +- .../pipeline/tests/SchemaEvolveE2eITCase.java | 1 - .../SchemaEvolvingTransformE2eITCase.java | 11 +++-- .../pipeline/tests/TransformE2eITCase.java | 18 ++++--- .../SchemaRegistryRequestHandler.java | 4 -- 14 files changed, 179 insertions(+), 49 deletions(-) diff --git a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java index d5df8eda14d..8179fdbffec 100644 --- a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java +++ b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java @@ -19,6 +19,8 @@ import org.apache.flink.cdc.common.configuration.Configuration; import org.apache.flink.cdc.common.event.SchemaChangeEventType; +import org.apache.flink.cdc.common.event.SchemaChangeEventTypeFamily; +import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior; import org.apache.flink.cdc.common.utils.StringUtils; import org.apache.flink.cdc.composer.definition.PipelineDef; import org.apache.flink.cdc.composer.definition.RouteDef; @@ -35,11 +37,14 @@ import java.nio.file.Path; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.stream.Stream; +import static org.apache.flink.cdc.common.pipeline.PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR; import static org.apache.flink.cdc.common.utils.ChangeEventUtils.resolveSchemaEvolutionOptions; import static org.apache.flink.cdc.common.utils.Preconditions.checkNotNull; @@ -99,6 +104,19 @@ public PipelineDef parse(String pipelineDefText, Configuration globalPipelineCon private PipelineDef parse(JsonNode pipelineDefJsonNode, Configuration globalPipelineConfig) throws Exception { + + // UDFs are optional. We parse UDF first and remove it from the pipelineDefJsonNode since + // it's not of plain data types and must be removed before calling toPipelineConfig. + List udfDefs = new ArrayList<>(); + Optional.ofNullable(((ObjectNode) pipelineDefJsonNode.get(PIPELINE_KEY)).remove(UDF_KEY)) + .ifPresent(node -> node.forEach(udf -> udfDefs.add(toUdfDef(udf)))); + + // Pipeline configs are optional + Configuration userPipelineConfig = toPipelineConfig(pipelineDefJsonNode.get(PIPELINE_KEY)); + + SchemaChangeBehavior schemaChangeBehavior = + userPipelineConfig.get(PIPELINE_SCHEMA_CHANGE_BEHAVIOR); + // Source is required SourceDef sourceDef = toSourceDef( @@ -113,7 +131,8 @@ private PipelineDef parse(JsonNode pipelineDefJsonNode, Configuration globalPipe checkNotNull( pipelineDefJsonNode.get(SINK_KEY), "Missing required field \"%s\" in pipeline definition", - SINK_KEY)); + SINK_KEY), + schemaChangeBehavior); // Transforms are optional List transformDefs = new ArrayList<>(); @@ -128,14 +147,6 @@ private PipelineDef parse(JsonNode pipelineDefJsonNode, Configuration globalPipe Optional.ofNullable(pipelineDefJsonNode.get(ROUTE_KEY)) .ifPresent(node -> node.forEach(route -> routeDefs.add(toRouteDef(route)))); - // UDFs are optional - List udfDefs = new ArrayList<>(); - Optional.ofNullable(((ObjectNode) pipelineDefJsonNode.get(PIPELINE_KEY)).remove(UDF_KEY)) - .ifPresent(node -> node.forEach(udf -> udfDefs.add(toUdfDef(udf)))); - - // Pipeline configs are optional - Configuration userPipelineConfig = toPipelineConfig(pipelineDefJsonNode.get(PIPELINE_KEY)); - // Merge user config into global config Configuration pipelineConfig = new Configuration(); pipelineConfig.addAll(globalPipelineConfig); @@ -162,7 +173,7 @@ private SourceDef toSourceDef(JsonNode sourceNode) { return new SourceDef(type, name, Configuration.fromMap(sourceMap)); } - private SinkDef toSinkDef(JsonNode sinkNode) { + private SinkDef toSinkDef(JsonNode sinkNode, SchemaChangeBehavior schemaChangeBehavior) { List includedSETypes = new ArrayList<>(); List excludedSETypes = new ArrayList<>(); @@ -172,6 +183,23 @@ private SinkDef toSinkDef(JsonNode sinkNode) { Optional.ofNullable(sinkNode.get(EXCLUDE_SCHEMA_EVOLUTION_TYPES)) .ifPresent(e -> e.forEach(tag -> excludedSETypes.add(tag.asText()))); + if (includedSETypes.isEmpty()) { + // If no schema evolution types are specified, include all schema evolution types by + // default. + Arrays.stream(SchemaChangeEventTypeFamily.ALL) + .map(SchemaChangeEventType::getTag) + .forEach(includedSETypes::add); + } + + if (excludedSETypes.isEmpty() + && SchemaChangeBehavior.LENIENT.equals(schemaChangeBehavior)) { + // In lenient mode, we exclude DROP_TABLE and TRUNCATE_TABLE by default. This could be + // overridden by manually specifying excluded types. + Stream.of(SchemaChangeEventType.DROP_TABLE, SchemaChangeEventType.TRUNCATE_TABLE) + .map(SchemaChangeEventType::getTag) + .forEach(excludedSETypes::add); + } + Set declaredSETypes = resolveSchemaEvolutionOptions(includedSETypes, excludedSETypes); diff --git a/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java b/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java index f57dd62c7ad..2ecf4587074 100644 --- a/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java +++ b/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java @@ -27,6 +27,7 @@ import org.apache.flink.cdc.composer.definition.UdfDef; import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap; +import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableSet; import org.apache.flink.shaded.guava31.com.google.common.io.Resources; import org.junit.jupiter.api.Test; @@ -37,6 +38,11 @@ import java.util.Arrays; import java.util.Collections; +import static org.apache.flink.cdc.common.event.SchemaChangeEventType.ADD_COLUMN; +import static org.apache.flink.cdc.common.event.SchemaChangeEventType.ALTER_COLUMN_TYPE; +import static org.apache.flink.cdc.common.event.SchemaChangeEventType.CREATE_TABLE; +import static org.apache.flink.cdc.common.event.SchemaChangeEventType.DROP_COLUMN; +import static org.apache.flink.cdc.common.event.SchemaChangeEventType.RENAME_COLUMN; import static org.apache.flink.cdc.common.pipeline.PipelineOptions.PIPELINE_LOCAL_TIME_ZONE; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy; @@ -384,7 +390,13 @@ void testParsingFullDefinitionFromString() throws Exception { Configuration.fromMap( ImmutableMap.builder() .put("bootstrap-servers", "localhost:9092") - .build())), + .build()), + ImmutableSet.of( + DROP_COLUMN, + ALTER_COLUMN_TYPE, + ADD_COLUMN, + CREATE_TABLE, + RENAME_COLUMN)), Collections.singletonList( new RouteDef( "mydb.default.app_order_.*", @@ -401,7 +413,16 @@ void testParsingFullDefinitionFromString() throws Exception { private final PipelineDef minimizedDef = new PipelineDef( new SourceDef("mysql", null, new Configuration()), - new SinkDef("kafka", null, new Configuration()), + new SinkDef( + "kafka", + null, + new Configuration(), + ImmutableSet.of( + DROP_COLUMN, + ALTER_COLUMN_TYPE, + ADD_COLUMN, + CREATE_TABLE, + RENAME_COLUMN)), Collections.emptyList(), Collections.emptyList(), Collections.emptyList(), @@ -474,7 +495,16 @@ void testParsingFullDefinitionFromString() throws Exception { private final PipelineDef pipelineDefWithUdf = new PipelineDef( new SourceDef("values", null, new Configuration()), - new SinkDef("values", null, new Configuration()), + new SinkDef( + "values", + null, + new Configuration(), + ImmutableSet.of( + DROP_COLUMN, + ALTER_COLUMN_TYPE, + ADD_COLUMN, + CREATE_TABLE, + RENAME_COLUMN)), Collections.emptyList(), Collections.singletonList( new TransformDef( diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/SchemaChangeEventType.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/SchemaChangeEventType.java index 8132c29a3fd..bbe4b415c6a 100644 --- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/SchemaChangeEventType.java +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/SchemaChangeEventType.java @@ -22,13 +22,23 @@ /** An enumeration of schema change event types for {@link SchemaChangeEvent}. */ @PublicEvolving public enum SchemaChangeEventType { - ADD_COLUMN, - ALTER_COLUMN_TYPE, - CREATE_TABLE, - DROP_COLUMN, - DROP_TABLE, - RENAME_COLUMN, - TRUNCATE_TABLE; + ADD_COLUMN("add.column"), + ALTER_COLUMN_TYPE("alter.column.type"), + CREATE_TABLE("create.table"), + DROP_COLUMN("drop.column"), + DROP_TABLE("drop.table"), + RENAME_COLUMN("rename.column"), + TRUNCATE_TABLE("truncate.table"); + + private final String tag; + + SchemaChangeEventType(String tag) { + this.tag = tag; + } + + public String getTag() { + return tag; + } public static SchemaChangeEventType ofEvent(SchemaChangeEvent event) { if (event instanceof AddColumnEvent) { diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/pipeline/PipelineOptions.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/pipeline/PipelineOptions.java index 48a4fbb13a3..343e9927075 100644 --- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/pipeline/PipelineOptions.java +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/pipeline/PipelineOptions.java @@ -48,7 +48,7 @@ public class PipelineOptions { public static final ConfigOption PIPELINE_SCHEMA_CHANGE_BEHAVIOR = ConfigOptions.key("schema.change.behavior") .enumType(SchemaChangeBehavior.class) - .defaultValue(SchemaChangeBehavior.EVOLVE) + .defaultValue(SchemaChangeBehavior.LENIENT) .withDescription( Description.builder() .text("Behavior for handling schema change events. ") diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/ChangeEventUtils.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/ChangeEventUtils.java index 940dc31448a..483752ce92e 100644 --- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/ChangeEventUtils.java +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/ChangeEventUtils.java @@ -90,12 +90,8 @@ public static Set resolveSchemaEvolutionOptions( List includedSchemaEvolutionTypes, List excludedSchemaEvolutionTypes) { List resultTypes = new ArrayList<>(); - if (includedSchemaEvolutionTypes.isEmpty()) { - resultTypes.addAll(Arrays.asList(SchemaChangeEventTypeFamily.ALL)); - } else { - for (String includeTag : includedSchemaEvolutionTypes) { - resultTypes.addAll(resolveSchemaEvolutionTag(includeTag)); - } + for (String includeTag : includedSchemaEvolutionTypes) { + resultTypes.addAll(resolveSchemaEvolutionTag(includeTag)); } for (String excludeTag : excludedSchemaEvolutionTypes) { diff --git a/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/ChangeEventUtilsTest.java b/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/ChangeEventUtilsTest.java index fd3636191d5..5cdff9c5211 100644 --- a/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/ChangeEventUtilsTest.java +++ b/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/ChangeEventUtilsTest.java @@ -17,11 +17,16 @@ package org.apache.flink.cdc.common.utils; +import org.apache.flink.cdc.common.event.SchemaChangeEventType; +import org.apache.flink.cdc.common.event.SchemaChangeEventTypeFamily; + import org.assertj.core.util.Sets; import org.junit.jupiter.api.Test; import java.util.Arrays; import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; import static org.apache.flink.cdc.common.event.SchemaChangeEventType.ADD_COLUMN; import static org.apache.flink.cdc.common.event.SchemaChangeEventType.ALTER_COLUMN_TYPE; @@ -36,9 +41,12 @@ public class ChangeEventUtilsTest { @Test public void testResolveSchemaEvolutionOptions() { - assertThat( - ChangeEventUtils.resolveSchemaEvolutionOptions( - Collections.emptyList(), Collections.emptyList())) + + List allTags = + Arrays.stream(SchemaChangeEventTypeFamily.ALL) + .map(SchemaChangeEventType::getTag) + .collect(Collectors.toList()); + assertThat(ChangeEventUtils.resolveSchemaEvolutionOptions(allTags, Collections.emptyList())) .isEqualTo( Sets.set( TRUNCATE_TABLE, @@ -51,7 +59,7 @@ public void testResolveSchemaEvolutionOptions() { assertThat( ChangeEventUtils.resolveSchemaEvolutionOptions( - Collections.emptyList(), Collections.singletonList("drop"))) + allTags, Collections.singletonList("drop"))) .isEqualTo( Sets.set( ADD_COLUMN, @@ -73,7 +81,7 @@ public void testResolveSchemaEvolutionOptions() { assertThat( ChangeEventUtils.resolveSchemaEvolutionOptions( - Collections.emptyList(), Collections.singletonList("drop.column"))) + allTags, Collections.singletonList("drop.column"))) .isEqualTo( Sets.set( ADD_COLUMN, diff --git a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java index 5c2d48fe5cf..81d466aab82 100644 --- a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java +++ b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java @@ -26,6 +26,7 @@ import org.apache.flink.cdc.common.event.RenameColumnEvent; import org.apache.flink.cdc.common.event.TableId; import org.apache.flink.cdc.common.pipeline.PipelineOptions; +import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior; import org.apache.flink.cdc.common.schema.Column; import org.apache.flink.cdc.common.schema.Schema; import org.apache.flink.cdc.common.types.DataType; @@ -134,6 +135,8 @@ void testSingleSplitSingleTable(ValuesDataSink.SinkApi sinkApi) throws Exception // Setup pipeline Configuration pipelineConfig = new Configuration(); pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1); + pipelineConfig.set( + PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE); PipelineDef pipelineDef = new PipelineDef( sourceDef, @@ -191,6 +194,8 @@ void testSingleSplitMultipleTables(ValuesDataSink.SinkApi sinkApi) throws Except // Setup pipeline Configuration pipelineConfig = new Configuration(); pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1); + pipelineConfig.set( + PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE); PipelineDef pipelineDef = new PipelineDef( sourceDef, @@ -313,6 +318,8 @@ void testTransform(ValuesDataSink.SinkApi sinkApi) throws Exception { // Setup pipeline Configuration pipelineConfig = new Configuration(); pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1); + pipelineConfig.set( + PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE); PipelineDef pipelineDef = new PipelineDef( sourceDef, @@ -373,6 +380,8 @@ void testOpTypeMetadataColumn(ValuesDataSink.SinkApi sinkApi) throws Exception { // Setup pipeline Configuration pipelineConfig = new Configuration(); pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1); + pipelineConfig.set( + PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE); PipelineDef pipelineDef = new PipelineDef( sourceDef, @@ -441,6 +450,8 @@ void testTransformTwice(ValuesDataSink.SinkApi sinkApi) throws Exception { // Setup pipeline Configuration pipelineConfig = new Configuration(); pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1); + pipelineConfig.set( + PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE); PipelineDef pipelineDef = new PipelineDef( sourceDef, @@ -496,6 +507,8 @@ void testOneToOneRouting() throws Exception { // Setup pipeline Configuration pipelineConfig = new Configuration(); pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1); + pipelineConfig.set( + PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE); PipelineDef pipelineDef = new PipelineDef( sourceDef, @@ -569,6 +582,8 @@ void testIdenticalOneToOneRouting() throws Exception { // Setup pipeline Configuration pipelineConfig = new Configuration(); pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1); + pipelineConfig.set( + PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE); PipelineDef pipelineDef = new PipelineDef( sourceDef, @@ -766,6 +781,8 @@ void testMergingWithRoute() throws Exception { // Setup pipeline Configuration pipelineConfig = new Configuration(); pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1); + pipelineConfig.set( + PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE); PipelineDef pipelineDef = new PipelineDef( sourceDef, @@ -972,6 +989,8 @@ void testTransformMergingWithRoute() throws Exception { // Setup pipeline Configuration pipelineConfig = new Configuration(); pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1); + pipelineConfig.set( + PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE); PipelineDef pipelineDef = new PipelineDef( sourceDef, @@ -1035,6 +1054,8 @@ void testRouteWithReplaceSymbol(ValuesDataSink.SinkApi sinkApi) throws Exception // Setup pipeline Configuration pipelineConfig = new Configuration(); pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1); + pipelineConfig.set( + PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE); PipelineDef pipelineDef = new PipelineDef( sourceDef, diff --git a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java index c489b2e8196..4c2536011f0 100644 --- a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java +++ b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java @@ -29,6 +29,7 @@ import org.apache.flink.cdc.common.event.RenameColumnEvent; import org.apache.flink.cdc.common.event.TableId; import org.apache.flink.cdc.common.pipeline.PipelineOptions; +import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior; import org.apache.flink.cdc.common.schema.Column; import org.apache.flink.cdc.common.schema.Schema; import org.apache.flink.cdc.common.types.DataType; @@ -213,6 +214,8 @@ void testTransformWithTemporalFunction() throws Exception { // Setup pipeline Configuration pipelineConfig = new Configuration(); pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1); + pipelineConfig.set( + PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE); pipelineConfig.set(PipelineOptions.PIPELINE_LOCAL_TIME_ZONE, "America/Los_Angeles"); PipelineDef pipelineDef = new PipelineDef( @@ -266,6 +269,8 @@ void testVanillaTransformWithSchemaEvolution() throws Exception { // Setup pipeline Configuration pipelineConfig = new Configuration(); pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1); + pipelineConfig.set( + PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE); PipelineDef pipelineDef = new PipelineDef( sourceDef, @@ -350,6 +355,8 @@ void testWildcardTransformWithSchemaEvolution() throws Exception { // Setup pipeline Configuration pipelineConfig = new Configuration(); pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1); + pipelineConfig.set( + PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE); PipelineDef pipelineDef = new PipelineDef( sourceDef, @@ -436,6 +443,8 @@ void testExplicitTransformWithSchemaEvolution() throws Exception { // Setup pipeline Configuration pipelineConfig = new Configuration(); pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1); + pipelineConfig.set( + PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE); PipelineDef pipelineDef = new PipelineDef( sourceDef, @@ -516,6 +525,8 @@ void testPreAsteriskWithSchemaEvolution() throws Exception { // Setup pipeline Configuration pipelineConfig = new Configuration(); pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1); + pipelineConfig.set( + PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE); PipelineDef pipelineDef = new PipelineDef( sourceDef, @@ -608,6 +619,8 @@ void testPostAsteriskWithSchemaEvolution() throws Exception { // Setup pipeline Configuration pipelineConfig = new Configuration(); pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1); + pipelineConfig.set( + PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE); PipelineDef pipelineDef = new PipelineDef( sourceDef, diff --git a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineUdfITCase.java b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineUdfITCase.java index f9be7d7dd31..c3b412dc781 100644 --- a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineUdfITCase.java +++ b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineUdfITCase.java @@ -19,6 +19,7 @@ import org.apache.flink.cdc.common.configuration.Configuration; import org.apache.flink.cdc.common.pipeline.PipelineOptions; +import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior; import org.apache.flink.cdc.composer.PipelineExecution; import org.apache.flink.cdc.composer.definition.PipelineDef; import org.apache.flink.cdc.composer.definition.SinkDef; @@ -139,6 +140,8 @@ void testTransformWithUdf(ValuesDataSink.SinkApi sinkApi, String language) throw // Setup pipeline Configuration pipelineConfig = new Configuration(); pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1); + pipelineConfig.set( + PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE); PipelineDef pipelineDef = new PipelineDef( sourceDef, @@ -207,6 +210,8 @@ void testFilterWithUdf(ValuesDataSink.SinkApi sinkApi, String language) throws E // Setup pipeline Configuration pipelineConfig = new Configuration(); pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1); + pipelineConfig.set( + PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE); PipelineDef pipelineDef = new PipelineDef( sourceDef, @@ -273,6 +278,8 @@ void testOverloadedUdf(ValuesDataSink.SinkApi sinkApi, String language) throws E // Setup pipeline Configuration pipelineConfig = new Configuration(); pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1); + pipelineConfig.set( + PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE); PipelineDef pipelineDef = new PipelineDef( sourceDef, @@ -341,6 +348,8 @@ void testUdfLifecycle(ValuesDataSink.SinkApi sinkApi, String language) throws Ex // Setup pipeline Configuration pipelineConfig = new Configuration(); pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1); + pipelineConfig.set( + PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE); PipelineDef pipelineDef = new PipelineDef( sourceDef, @@ -411,6 +420,8 @@ void testTypeHintedUdf(ValuesDataSink.SinkApi sinkApi, String language) throws E // Setup pipeline Configuration pipelineConfig = new Configuration(); pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1); + pipelineConfig.set( + PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE); PipelineDef pipelineDef = new PipelineDef( sourceDef, @@ -472,6 +483,8 @@ void testComplicatedUdf(ValuesDataSink.SinkApi sinkApi, String language) throws // Setup pipeline Configuration pipelineConfig = new Configuration(); pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1); + pipelineConfig.set( + PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE); PipelineDef pipelineDef = new PipelineDef( sourceDef, @@ -565,6 +578,8 @@ void testTransformWithFlinkUdf(ValuesDataSink.SinkApi sinkApi, String language) // Setup pipeline Configuration pipelineConfig = new Configuration(); pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1); + pipelineConfig.set( + PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE); PipelineDef pipelineDef = new PipelineDef( sourceDef, @@ -632,6 +647,8 @@ void testFilterWithFlinkUdf(ValuesDataSink.SinkApi sinkApi, String language) thr // Setup pipeline Configuration pipelineConfig = new Configuration(); pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1); + pipelineConfig.set( + PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE); PipelineDef pipelineDef = new PipelineDef( sourceDef, @@ -697,6 +714,8 @@ void testOverloadedFlinkUdf(ValuesDataSink.SinkApi sinkApi, String language) thr // Setup pipeline Configuration pipelineConfig = new Configuration(); pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1); + pipelineConfig.set( + PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE); PipelineDef pipelineDef = new PipelineDef( sourceDef, @@ -758,6 +777,8 @@ void testComplicatedFlinkUdf(ValuesDataSink.SinkApi sinkApi, String language) th // Setup pipeline Configuration pipelineConfig = new Configuration(); pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1); + pipelineConfig.set( + PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE); PipelineDef pipelineDef = new PipelineDef( sourceDef, diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java index 1f730be6259..29fe5db9d6e 100644 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java @@ -210,7 +210,8 @@ public void testSchemaChangeEvents() throws Exception { + " type: values\n" + "\n" + "pipeline:\n" - + " parallelism: %d", + + " parallelism: %d\n" + + " schema.change.behavior: evolve", INTER_CONTAINER_MYSQL_ALIAS, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD, diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolveE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolveE2eITCase.java index 92c2622c9b2..7551add0862 100644 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolveE2eITCase.java +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolveE2eITCase.java @@ -190,7 +190,6 @@ public void testLenientSchemaEvolution() throws Exception { "AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`precise_age` DOUBLE, position=LAST, existedColumnName=null}]}", "AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`biological_sex` TINYINT, position=LAST, existedColumnName=null}]}", "DataChangeEvent{tableId=%s.members, before=[], after=[1013, Fiona, null, null, 16.0, null], op=INSERT, meta=()}", - "TruncateTableEvent{tableId=%s.members}", "DataChangeEvent{tableId=%s.members, before=[], after=[1014, Gem, null, null, 17.0, null], op=INSERT, meta=()}")); assertNotExists( diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolvingTransformE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolvingTransformE2eITCase.java index 1d1f79f0ffe..50d5dfb1d02 100644 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolvingTransformE2eITCase.java +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolvingTransformE2eITCase.java @@ -189,7 +189,6 @@ public void testLenientSchemaEvolution() throws Exception { "AlterColumnTypeEvent{tableId=%s.members, typeMapping={age=DOUBLE}, oldTypeMapping={age=INT}}", "AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`biological_sex` TINYINT, position=LAST, existedColumnName=null}]}", "DataChangeEvent{tableId=%s.members, before=[], after=[1013 -> Fiona, 1013, Fiona, 16.0, 1026169, age < 20, null, null], op=INSERT, meta=()}", - "TruncateTableEvent{tableId=%s.members}", "DataChangeEvent{tableId=%s.members, before=[], after=[1014 -> Gem, 1014, Gem, 17.0, 1028196, age < 20, null, null], op=INSERT, meta=()}")); } @@ -233,11 +232,12 @@ public void testUnexpectedBehavior() { + "\n" + "pipeline:\n" + " schema.change.behavior: unexpected\n" - + " parallelism: 1", + + " parallelism: %d", INTER_CONTAINER_MYSQL_ALIAS, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD, - schemaEvolveDatabase.getDatabaseName()); + schemaEvolveDatabase.getDatabaseName(), + parallelism); Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar"); Path valuesCdcJar = TestUtils.getResource("values-cdc-pipeline-connector.jar"); Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar"); @@ -310,7 +310,7 @@ private void testGenericSchemaEvolution( + "\n" + "pipeline:\n" + " schema.change.behavior: %s\n" - + " parallelism: 1", + + " parallelism: %d", INTER_CONTAINER_MYSQL_ALIAS, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD, @@ -318,7 +318,8 @@ private void testGenericSchemaEvolution( mergeTable ? "(members|new_members)" : "members", dbName, dbName, - behavior); + behavior, + parallelism); Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar"); Path valuesCdcJar = TestUtils.getResource("values-cdc-pipeline-connector.jar"); Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar"); diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java index 9c613035925..36e7d9a861e 100644 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java @@ -738,12 +738,14 @@ public void testTransformWithSchemaEvolution() throws Exception { + " projection: ID, 'id -> ' || ID AS UID, PRICEALPHA AS PRICE\n" + " filter: ID > 1008\n" + "pipeline:\n" - + " parallelism: 1", + + " parallelism: %d\n" + + " schema.change.behavior: evolve", INTER_CONTAINER_MYSQL_ALIAS, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD, transformTestDatabase.getDatabaseName(), - transformTestDatabase.getDatabaseName()); + transformTestDatabase.getDatabaseName(), + parallelism); Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar"); Path valuesCdcJar = TestUtils.getResource("values-cdc-pipeline-connector.jar"); Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar"); @@ -835,12 +837,14 @@ public void testTransformWildcardPrefixWithSchemaEvolution() throws Exception { + " projection: \\*, 'id -> ' || ID AS UID\n" + " filter: ID > 1008\n" + "pipeline:\n" - + " parallelism: 1", + + " parallelism: %d\n" + + " schema.change.behavior: evolve", INTER_CONTAINER_MYSQL_ALIAS, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD, transformTestDatabase.getDatabaseName(), - transformTestDatabase.getDatabaseName()); + transformTestDatabase.getDatabaseName(), + parallelism); Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar"); Path valuesCdcJar = TestUtils.getResource("values-cdc-pipeline-connector.jar"); Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar"); @@ -940,12 +944,14 @@ public void testTransformWildcardSuffixWithSchemaEvolution() throws Exception { + " projection: ID || ' <- id' AS UID, *\n" + " filter: ID > 1008\n" + "pipeline:\n" - + " parallelism: 1", + + " parallelism: %d\n" + + " schema.change.behavior: evolve", INTER_CONTAINER_MYSQL_ALIAS, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD, transformTestDatabase.getDatabaseName(), - transformTestDatabase.getDatabaseName()); + transformTestDatabase.getDatabaseName(), + parallelism); Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar"); Path valuesCdcJar = TestUtils.getResource("values-cdc-pipeline-connector.jar"); Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar"); diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java index 444fb41d21d..ae765bae2a9 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java @@ -403,10 +403,6 @@ private List lenientizeSchemaChangeEvent(SchemaChangeEvent ev } return events; } - case DROP_TABLE: - // We don't drop any tables in Lenient mode. - LOG.info("A drop table event {} has been ignored in Lenient mode.", event); - return Collections.emptyList(); default: return Collections.singletonList(event); } From bdf051bf57c57faccc5978b2d6c1dd6492ae5f8a Mon Sep 17 00:00:00 2001 From: yuxiqian <34335406+yuxiqian@users.noreply.github.com> Date: Sun, 25 Aug 2024 16:34:57 +0800 Subject: [PATCH 2/2] [hotfix] Fix pre-partition broadcasting failure due to shallow-copying --- .../cdc/runtime/partitioning/PrePartitionOperator.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/partitioning/PrePartitionOperator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/partitioning/PrePartitionOperator.java index 6d66fa87850..1171ad07b48 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/partitioning/PrePartitionOperator.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/partitioning/PrePartitionOperator.java @@ -28,6 +28,7 @@ import org.apache.flink.cdc.common.schema.Schema; import org.apache.flink.cdc.runtime.operators.schema.SchemaOperator; import org.apache.flink.cdc.runtime.operators.sink.SchemaEvolutionClient; +import org.apache.flink.cdc.runtime.serializer.event.EventSerializer; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; @@ -108,7 +109,10 @@ private void partitionBy(DataChangeEvent dataChangeEvent) throws Exception { private void broadcastEvent(Event toBroadcast) { for (int i = 0; i < downstreamParallelism; i++) { - output.collect(new StreamRecord<>(new PartitioningEvent(toBroadcast, i))); + // Deep-copying each event is required since downstream subTasks might run in the same + // JVM + Event copiedEvent = EventSerializer.INSTANCE.copy(toBroadcast); + output.collect(new StreamRecord<>(new PartitioningEvent(copiedEvent, i))); } }