diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/pipeline/SchemaChangeBehavior.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/pipeline/SchemaChangeBehavior.java index 1ac5ddbb7e2..b94e2373d15 100644 --- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/pipeline/SchemaChangeBehavior.java +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/pipeline/SchemaChangeBehavior.java @@ -19,9 +19,11 @@ import org.apache.flink.cdc.common.annotation.PublicEvolving; +import java.io.Serializable; + /** Behavior for handling schema changes. */ @PublicEvolving -public enum SchemaChangeBehavior { +public enum SchemaChangeBehavior implements Serializable { EVOLVE, IGNORE, EXCEPTION diff --git a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/SchemaOperatorTranslator.java b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/SchemaOperatorTranslator.java index a69741c7b4a..903c5dbe004 100644 --- a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/SchemaOperatorTranslator.java +++ b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/SchemaOperatorTranslator.java @@ -20,7 +20,6 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.cdc.common.annotation.Internal; import org.apache.flink.cdc.common.event.Event; -import org.apache.flink.cdc.common.event.SchemaChangeEvent; import org.apache.flink.cdc.common.event.TableId; import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior; import org.apache.flink.cdc.common.sink.MetadataApplier; @@ -57,18 +56,7 @@ public DataStream translate( int parallelism, MetadataApplier metadataApplier, List routes) { - switch (schemaChangeBehavior) { - case EVOLVE: - return addSchemaOperator(input, parallelism, metadataApplier, routes); - case IGNORE: - return dropSchemaChangeEvent(input, parallelism); - case EXCEPTION: - return exceptionOnSchemaChange(input, parallelism); - default: - throw new IllegalArgumentException( - String.format( - "Unrecognized schema change behavior: %s", schemaChangeBehavior)); - } + return addSchemaOperator(input, parallelism, metadataApplier, routes); } public String getSchemaOperatorUid() { @@ -89,27 +77,9 @@ private DataStream addSchemaOperator( input.transform( "SchemaOperator", new EventTypeInfo(), - new SchemaOperatorFactory(metadataApplier, routingRules, rpcTimeOut)); + new SchemaOperatorFactory( + metadataApplier, routingRules, rpcTimeOut, schemaChangeBehavior)); stream.uid(schemaOperatorUid).setParallelism(parallelism); return stream; } - - private DataStream dropSchemaChangeEvent(DataStream input, int parallelism) { - return input.filter(event -> !(event instanceof SchemaChangeEvent)) - .setParallelism(parallelism); - } - - private DataStream exceptionOnSchemaChange(DataStream input, int parallelism) { - return input.map( - event -> { - if (event instanceof SchemaChangeEvent) { - throw new RuntimeException( - String.format( - "Aborting execution as the pipeline encountered a schema change event: %s", - event)); - } - return event; - }) - .setParallelism(parallelism); - } } diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperatorFactory.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperatorFactory.java index ecf50017156..f4f48d06984 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperatorFactory.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperatorFactory.java @@ -21,6 +21,7 @@ import org.apache.flink.cdc.common.annotation.Internal; import org.apache.flink.cdc.common.event.Event; import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior; import org.apache.flink.cdc.common.sink.MetadataApplier; import org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistryProvider; import org.apache.flink.runtime.jobgraph.OperatorID; @@ -41,19 +42,23 @@ public class SchemaOperatorFactory extends SimpleOperatorFactory private final MetadataApplier metadataApplier; private final List> routingRules; + private final SchemaChangeBehavior schemaChangeBehavior; public SchemaOperatorFactory( MetadataApplier metadataApplier, List> routingRules, - Duration rpcTimeOut) { + Duration rpcTimeOut, + SchemaChangeBehavior schemaChangeBehavior) { super(new SchemaOperator(routingRules, rpcTimeOut)); this.metadataApplier = metadataApplier; this.routingRules = routingRules; + this.schemaChangeBehavior = schemaChangeBehavior; } @Override public OperatorCoordinator.Provider getCoordinatorProvider( String operatorName, OperatorID operatorID) { - return new SchemaRegistryProvider(operatorID, operatorName, metadataApplier, routingRules); + return new SchemaRegistryProvider( + operatorID, operatorName, metadataApplier, routingRules, schemaChangeBehavior); } } diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistry.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistry.java index 3361c9454ab..55e3c6cfd65 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistry.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistry.java @@ -19,6 +19,7 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior; import org.apache.flink.cdc.common.schema.Selectors; import org.apache.flink.cdc.common.sink.MetadataApplier; import org.apache.flink.cdc.runtime.operators.schema.SchemaOperator; @@ -90,6 +91,8 @@ public class SchemaRegistry implements OperatorCoordinator, CoordinationRequestH private final List> routes; + private final SchemaChangeBehavior schemaChangeBehavior; + /** The request handler that handle all requests and events. */ private SchemaRegistryRequestHandler requestHandler; @@ -103,15 +106,26 @@ public SchemaRegistry( OperatorCoordinator.Context context, MetadataApplier metadataApplier, List> routes) { + this(operatorName, context, metadataApplier, routes, SchemaChangeBehavior.EVOLVE); + } + + public SchemaRegistry( + String operatorName, + OperatorCoordinator.Context context, + MetadataApplier metadataApplier, + List> routes, + SchemaChangeBehavior schemaChangeBehavior) { this.context = context; this.operatorName = operatorName; this.failedReasons = new HashMap<>(); this.metadataApplier = metadataApplier; this.routes = routes; + this.schemaChangeBehavior = schemaChangeBehavior; schemaManager = new SchemaManager(); schemaDerivation = new SchemaDerivation(schemaManager, routes, new HashMap<>()); requestHandler = - new SchemaRegistryRequestHandler(metadataApplier, schemaManager, schemaDerivation); + new SchemaRegistryRequestHandler( + metadataApplier, schemaManager, schemaDerivation, schemaChangeBehavior); } @Override @@ -207,7 +221,7 @@ public void resetToCheckpoint(long checkpointId, @Nullable byte[] checkpointData schemaDerivation = new SchemaDerivation(schemaManager, routes, derivationMapping); requestHandler = new SchemaRegistryRequestHandler( - metadataApplier, schemaManager, schemaDerivation); + metadataApplier, schemaManager, schemaDerivation, schemaChangeBehavior); } } diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryProvider.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryProvider.java index 1f6e7aaf57a..524b09dca3b 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryProvider.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryProvider.java @@ -20,6 +20,7 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.cdc.common.annotation.Internal; import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior; import org.apache.flink.cdc.common.schema.Selectors; import org.apache.flink.cdc.common.sink.MetadataApplier; import org.apache.flink.runtime.jobgraph.OperatorID; @@ -37,16 +38,19 @@ public class SchemaRegistryProvider implements OperatorCoordinator.Provider { private final String operatorName; private final MetadataApplier metadataApplier; private final List> routingRules; + private final SchemaChangeBehavior schemaChangeBehavior; public SchemaRegistryProvider( OperatorID operatorID, String operatorName, MetadataApplier metadataApplier, - List> routingRules) { + List> routingRules, + SchemaChangeBehavior schemaChangeBehavior) { this.operatorID = operatorID; this.operatorName = operatorName; this.metadataApplier = metadataApplier; this.routingRules = routingRules; + this.schemaChangeBehavior = schemaChangeBehavior; } @Override @@ -69,6 +73,7 @@ public OperatorCoordinator create(OperatorCoordinator.Context context) throws Ex return new Tuple2<>(selectors, replaceBy); }) .collect(Collectors.toList()); - return new SchemaRegistry(operatorName, context, metadataApplier, routes); + return new SchemaRegistry( + operatorName, context, metadataApplier, routes, schemaChangeBehavior); } } diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java index 1ee06a7e53f..21ec2cf8583 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java @@ -18,9 +18,11 @@ package org.apache.flink.cdc.runtime.operators.schema.coordinator; import org.apache.flink.cdc.common.annotation.Internal; +import org.apache.flink.cdc.common.annotation.VisibleForTesting; import org.apache.flink.cdc.common.event.CreateTableEvent; import org.apache.flink.cdc.common.event.SchemaChangeEvent; import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior; import org.apache.flink.cdc.common.sink.MetadataApplier; import org.apache.flink.cdc.runtime.operators.schema.event.RefreshPendingListsResponse; import org.apache.flink.cdc.runtime.operators.schema.event.ReleaseUpstreamRequest; @@ -30,6 +32,7 @@ import org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeResponse; import org.apache.flink.runtime.operators.coordination.CoordinationResponse; +import org.apache.commons.collections.CollectionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -64,6 +67,8 @@ public class SchemaRegistryRequestHandler implements Closeable { private final SchemaDerivation schemaDerivation; + private final SchemaChangeBehavior schemaChangeBehavior; + /** * Not applied SchemaChangeRequest before receiving all flush success events for its table from * sink writers. @@ -75,20 +80,22 @@ public class SchemaRegistryRequestHandler implements Closeable { /** Status of the execution of current schema change request. */ private boolean isSchemaChangeApplying; /** Actual exception if failed to apply schema change. */ - private Exception schemaChangeException; + @VisibleForTesting Exception schemaChangeException; /** Executor service to execute schema change. */ private final ExecutorService schemaChangeThreadPool; public SchemaRegistryRequestHandler( MetadataApplier metadataApplier, SchemaManager schemaManager, - SchemaDerivation schemaDerivation) { + SchemaDerivation schemaDerivation, + SchemaChangeBehavior schemaChangeBehavior) { this.metadataApplier = metadataApplier; this.activeSinkWriters = new HashSet<>(); this.flushedSinkWriters = new HashSet<>(); this.pendingSchemaChanges = new LinkedList<>(); this.schemaManager = schemaManager; this.schemaDerivation = schemaDerivation; + this.schemaChangeBehavior = schemaChangeBehavior; schemaChangeThreadPool = Executors.newSingleThreadExecutor(); isSchemaChangeApplying = false; } @@ -99,11 +106,14 @@ public SchemaRegistryRequestHandler( * @param tableId the table need to change schema * @param derivedSchemaChangeEvents list of the schema changes */ - private void applySchemaChange( - TableId tableId, List derivedSchemaChangeEvents) { + @VisibleForTesting + void applySchemaChange(TableId tableId, List derivedSchemaChangeEvents) { isSchemaChangeApplying = true; schemaChangeException = null; try { + if (ignoreApplyingSchemeChanges(tableId, derivedSchemaChangeEvents)) { + return; + } for (SchemaChangeEvent changeEvent : derivedSchemaChangeEvents) { metadataApplier.applySchemaChange(changeEvent); LOG.debug("Apply schema change {} to table {}.", changeEvent, tableId); @@ -119,6 +129,26 @@ private void applySchemaChange( } } + private boolean ignoreApplyingSchemeChanges( + TableId tableId, List derivedSchemaChangeEvents) { + if (CollectionUtils.isNotEmpty(derivedSchemaChangeEvents)) { + if (SchemaChangeBehavior.IGNORE.equals(schemaChangeBehavior)) { + LOG.debug( + "Ignore all derived schema changes, table {}, events {}.", + tableId, + derivedSchemaChangeEvents); + return true; + } + if (SchemaChangeBehavior.EXCEPTION.equals(schemaChangeBehavior)) { + throw new IllegalArgumentException( + String.format( + "Unrecognized schema change for table %s behavior: %s", + tableId, schemaChangeBehavior)); + } + } + return false; + } + /** * Handle the {@link SchemaChangeRequest} and wait for all sink subtasks flushing. * diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandlerTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandlerTest.java new file mode 100644 index 00000000000..2866278c48e --- /dev/null +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandlerTest.java @@ -0,0 +1,84 @@ +package org.apache.flink.cdc.runtime.operators.schema.coordinator; + +import org.apache.flink.cdc.common.event.CreateTableEvent; +import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior; +import org.apache.flink.cdc.common.schema.Schema; +import org.apache.flink.cdc.common.types.DataTypes; +import org.apache.flink.cdc.runtime.testutils.schema.CollectingMetadataApplier; + +import org.apache.commons.collections.CollectionUtils; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.time.Duration; +import java.util.Collections; +import java.util.HashMap; + +/** Unit test for {@link SchemaRegistryRequestHandler}. */ +class SchemaRegistryRequestHandlerTest { + + private static final TableId CUSTOMERS = + TableId.tableId("my_company", "my_branch", "customers"); + private static final Schema CUSTOMERS_SCHEMA = + Schema.newBuilder() + .physicalColumn("id", DataTypes.INT()) + .physicalColumn("name", DataTypes.STRING()) + .physicalColumn("phone", DataTypes.BIGINT()) + .primaryKey("id") + .build(); + + @Test + public void testIgnoreApplySchemaChange() throws IOException { + CollectingMetadataApplier metadataApplier = + new CollectingMetadataApplier(Duration.ofSeconds(1)); + try (SchemaRegistryRequestHandler schemaRegistryRequestHandler = + newRequestHandler(metadataApplier, SchemaChangeBehavior.IGNORE)) { + schemaRegistryRequestHandler.applySchemaChange( + CUSTOMERS, + Collections.singletonList(new CreateTableEvent(CUSTOMERS, CUSTOMERS_SCHEMA))); + Assertions.assertTrue(CollectionUtils.isEmpty(metadataApplier.getSchemaChangeEvents())); + } + } + + @Test + public void testExceptionApplySchemaChange() throws IOException { + CollectingMetadataApplier metadataApplier = + new CollectingMetadataApplier(Duration.ofSeconds(1)); + + try (SchemaRegistryRequestHandler schemaRegistryRequestHandler = + newRequestHandler(metadataApplier, SchemaChangeBehavior.EXCEPTION)) { + schemaRegistryRequestHandler.applySchemaChange( + CUSTOMERS, + Collections.singletonList(new CreateTableEvent(CUSTOMERS, CUSTOMERS_SCHEMA))); + Assertions.assertTrue( + schemaRegistryRequestHandler.schemaChangeException + instanceof IllegalArgumentException); + } + } + + @Test + public void testEvolveApplySchemaChange() throws IOException { + CollectingMetadataApplier metadataApplier = + new CollectingMetadataApplier(Duration.ofSeconds(1)); + try (SchemaRegistryRequestHandler schemaRegistryRequestHandler = + newRequestHandler(metadataApplier, SchemaChangeBehavior.EVOLVE)) { + schemaRegistryRequestHandler.applySchemaChange( + CUSTOMERS, + Collections.singletonList(new CreateTableEvent(CUSTOMERS, CUSTOMERS_SCHEMA))); + Assertions.assertTrue( + CollectionUtils.isNotEmpty(metadataApplier.getSchemaChangeEvents())); + } + } + + private static SchemaRegistryRequestHandler newRequestHandler( + CollectingMetadataApplier metadataApplier, SchemaChangeBehavior behavior) { + SchemaManager schemaManager = new SchemaManager(); + return new SchemaRegistryRequestHandler( + metadataApplier, + schemaManager, + new SchemaDerivation(schemaManager, Collections.emptyList(), new HashMap<>()), + behavior); + } +}