Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@

import org.apache.flink.cdc.common.annotation.PublicEvolving;

import java.io.Serializable;

/** Behavior for handling schema changes. */
@PublicEvolving
public enum SchemaChangeBehavior {
public enum SchemaChangeBehavior implements Serializable {
EVOLVE,
IGNORE,
EXCEPTION
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
import org.apache.flink.cdc.common.sink.MetadataApplier;
Expand Down Expand Up @@ -57,18 +56,7 @@ public DataStream<Event> translate(
int parallelism,
MetadataApplier metadataApplier,
List<RouteDef> routes) {
switch (schemaChangeBehavior) {
case EVOLVE:
return addSchemaOperator(input, parallelism, metadataApplier, routes);
case IGNORE:
return dropSchemaChangeEvent(input, parallelism);
case EXCEPTION:
return exceptionOnSchemaChange(input, parallelism);
default:
throw new IllegalArgumentException(
String.format(
"Unrecognized schema change behavior: %s", schemaChangeBehavior));
}
return addSchemaOperator(input, parallelism, metadataApplier, routes);
}

public String getSchemaOperatorUid() {
Expand All @@ -89,27 +77,9 @@ private DataStream<Event> addSchemaOperator(
input.transform(
"SchemaOperator",
new EventTypeInfo(),
new SchemaOperatorFactory(metadataApplier, routingRules, rpcTimeOut));
new SchemaOperatorFactory(
metadataApplier, routingRules, rpcTimeOut, schemaChangeBehavior));
stream.uid(schemaOperatorUid).setParallelism(parallelism);
return stream;
}

private DataStream<Event> dropSchemaChangeEvent(DataStream<Event> input, int parallelism) {
return input.filter(event -> !(event instanceof SchemaChangeEvent))
.setParallelism(parallelism);
}

private DataStream<Event> exceptionOnSchemaChange(DataStream<Event> input, int parallelism) {
return input.map(
event -> {
if (event instanceof SchemaChangeEvent) {
throw new RuntimeException(
String.format(
"Aborting execution as the pipeline encountered a schema change event: %s",
event));
}
return event;
})
.setParallelism(parallelism);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
import org.apache.flink.cdc.common.sink.MetadataApplier;
import org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistryProvider;
import org.apache.flink.runtime.jobgraph.OperatorID;
Expand All @@ -41,19 +42,23 @@ public class SchemaOperatorFactory extends SimpleOperatorFactory<Event>

private final MetadataApplier metadataApplier;
private final List<Tuple2<String, TableId>> routingRules;
private final SchemaChangeBehavior schemaChangeBehavior;

public SchemaOperatorFactory(
MetadataApplier metadataApplier,
List<Tuple2<String, TableId>> routingRules,
Duration rpcTimeOut) {
Duration rpcTimeOut,
SchemaChangeBehavior schemaChangeBehavior) {
super(new SchemaOperator(routingRules, rpcTimeOut));
this.metadataApplier = metadataApplier;
this.routingRules = routingRules;
this.schemaChangeBehavior = schemaChangeBehavior;
}

@Override
public OperatorCoordinator.Provider getCoordinatorProvider(
String operatorName, OperatorID operatorID) {
return new SchemaRegistryProvider(operatorID, operatorName, metadataApplier, routingRules);
return new SchemaRegistryProvider(
operatorID, operatorName, metadataApplier, routingRules, schemaChangeBehavior);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
import org.apache.flink.cdc.common.schema.Selectors;
import org.apache.flink.cdc.common.sink.MetadataApplier;
import org.apache.flink.cdc.runtime.operators.schema.SchemaOperator;
Expand Down Expand Up @@ -90,6 +91,8 @@ public class SchemaRegistry implements OperatorCoordinator, CoordinationRequestH

private final List<Tuple2<Selectors, TableId>> routes;

private final SchemaChangeBehavior schemaChangeBehavior;

/** The request handler that handle all requests and events. */
private SchemaRegistryRequestHandler requestHandler;

Expand All @@ -103,15 +106,26 @@ public SchemaRegistry(
OperatorCoordinator.Context context,
MetadataApplier metadataApplier,
List<Tuple2<Selectors, TableId>> routes) {
this(operatorName, context, metadataApplier, routes, SchemaChangeBehavior.EVOLVE);
}

public SchemaRegistry(
String operatorName,
OperatorCoordinator.Context context,
MetadataApplier metadataApplier,
List<Tuple2<Selectors, TableId>> routes,
SchemaChangeBehavior schemaChangeBehavior) {
this.context = context;
this.operatorName = operatorName;
this.failedReasons = new HashMap<>();
this.metadataApplier = metadataApplier;
this.routes = routes;
this.schemaChangeBehavior = schemaChangeBehavior;
schemaManager = new SchemaManager();
schemaDerivation = new SchemaDerivation(schemaManager, routes, new HashMap<>());
requestHandler =
new SchemaRegistryRequestHandler(metadataApplier, schemaManager, schemaDerivation);
new SchemaRegistryRequestHandler(
metadataApplier, schemaManager, schemaDerivation, schemaChangeBehavior);
}

@Override
Expand Down Expand Up @@ -207,7 +221,7 @@ public void resetToCheckpoint(long checkpointId, @Nullable byte[] checkpointData
schemaDerivation = new SchemaDerivation(schemaManager, routes, derivationMapping);
requestHandler =
new SchemaRegistryRequestHandler(
metadataApplier, schemaManager, schemaDerivation);
metadataApplier, schemaManager, schemaDerivation, schemaChangeBehavior);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
import org.apache.flink.cdc.common.schema.Selectors;
import org.apache.flink.cdc.common.sink.MetadataApplier;
import org.apache.flink.runtime.jobgraph.OperatorID;
Expand All @@ -37,16 +38,19 @@ public class SchemaRegistryProvider implements OperatorCoordinator.Provider {
private final String operatorName;
private final MetadataApplier metadataApplier;
private final List<Tuple2<String, TableId>> routingRules;
private final SchemaChangeBehavior schemaChangeBehavior;

public SchemaRegistryProvider(
OperatorID operatorID,
String operatorName,
MetadataApplier metadataApplier,
List<Tuple2<String, TableId>> routingRules) {
List<Tuple2<String, TableId>> routingRules,
SchemaChangeBehavior schemaChangeBehavior) {
this.operatorID = operatorID;
this.operatorName = operatorName;
this.metadataApplier = metadataApplier;
this.routingRules = routingRules;
this.schemaChangeBehavior = schemaChangeBehavior;
}

@Override
Expand All @@ -69,6 +73,7 @@ public OperatorCoordinator create(OperatorCoordinator.Context context) throws Ex
return new Tuple2<>(selectors, replaceBy);
})
.collect(Collectors.toList());
return new SchemaRegistry(operatorName, context, metadataApplier, routes);
return new SchemaRegistry(
operatorName, context, metadataApplier, routes, schemaChangeBehavior);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
package org.apache.flink.cdc.runtime.operators.schema.coordinator;

import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.common.annotation.VisibleForTesting;
import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
import org.apache.flink.cdc.common.sink.MetadataApplier;
import org.apache.flink.cdc.runtime.operators.schema.event.RefreshPendingListsResponse;
import org.apache.flink.cdc.runtime.operators.schema.event.ReleaseUpstreamRequest;
Expand All @@ -30,6 +32,7 @@
import org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeResponse;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;

import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -64,6 +67,8 @@ public class SchemaRegistryRequestHandler implements Closeable {

private final SchemaDerivation schemaDerivation;

private final SchemaChangeBehavior schemaChangeBehavior;

/**
* Not applied SchemaChangeRequest before receiving all flush success events for its table from
* sink writers.
Expand All @@ -75,20 +80,22 @@ public class SchemaRegistryRequestHandler implements Closeable {
/** Status of the execution of current schema change request. */
private boolean isSchemaChangeApplying;
/** Actual exception if failed to apply schema change. */
private Exception schemaChangeException;
@VisibleForTesting Exception schemaChangeException;
/** Executor service to execute schema change. */
private final ExecutorService schemaChangeThreadPool;

public SchemaRegistryRequestHandler(
MetadataApplier metadataApplier,
SchemaManager schemaManager,
SchemaDerivation schemaDerivation) {
SchemaDerivation schemaDerivation,
SchemaChangeBehavior schemaChangeBehavior) {
this.metadataApplier = metadataApplier;
this.activeSinkWriters = new HashSet<>();
this.flushedSinkWriters = new HashSet<>();
this.pendingSchemaChanges = new LinkedList<>();
this.schemaManager = schemaManager;
this.schemaDerivation = schemaDerivation;
this.schemaChangeBehavior = schemaChangeBehavior;
schemaChangeThreadPool = Executors.newSingleThreadExecutor();
isSchemaChangeApplying = false;
}
Expand All @@ -99,11 +106,14 @@ public SchemaRegistryRequestHandler(
* @param tableId the table need to change schema
* @param derivedSchemaChangeEvents list of the schema changes
*/
private void applySchemaChange(
TableId tableId, List<SchemaChangeEvent> derivedSchemaChangeEvents) {
@VisibleForTesting
void applySchemaChange(TableId tableId, List<SchemaChangeEvent> derivedSchemaChangeEvents) {
isSchemaChangeApplying = true;
schemaChangeException = null;
try {
if (ignoreApplyingSchemeChanges(tableId, derivedSchemaChangeEvents)) {
return;
}
for (SchemaChangeEvent changeEvent : derivedSchemaChangeEvents) {
metadataApplier.applySchemaChange(changeEvent);
LOG.debug("Apply schema change {} to table {}.", changeEvent, tableId);
Expand All @@ -119,6 +129,26 @@ private void applySchemaChange(
}
}

private boolean ignoreApplyingSchemeChanges(
TableId tableId, List<SchemaChangeEvent> derivedSchemaChangeEvents) {
if (CollectionUtils.isNotEmpty(derivedSchemaChangeEvents)) {
if (SchemaChangeBehavior.IGNORE.equals(schemaChangeBehavior)) {
LOG.debug(
"Ignore all derived schema changes, table {}, events {}.",
tableId,
derivedSchemaChangeEvents);
return true;
}
if (SchemaChangeBehavior.EXCEPTION.equals(schemaChangeBehavior)) {
throw new IllegalArgumentException(
String.format(
"Unrecognized schema change for table %s behavior: %s",
tableId, schemaChangeBehavior));
}
}
return false;
}

/**
* Handle the {@link SchemaChangeRequest} and wait for all sink subtasks flushing.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package org.apache.flink.cdc.runtime.operators.schema.coordinator;

import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.types.DataTypes;
import org.apache.flink.cdc.runtime.testutils.schema.CollectingMetadataApplier;

import org.apache.commons.collections.CollectionUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;

/** Unit test for {@link SchemaRegistryRequestHandler}. */
class SchemaRegistryRequestHandlerTest {

private static final TableId CUSTOMERS =
TableId.tableId("my_company", "my_branch", "customers");
private static final Schema CUSTOMERS_SCHEMA =
Schema.newBuilder()
.physicalColumn("id", DataTypes.INT())
.physicalColumn("name", DataTypes.STRING())
.physicalColumn("phone", DataTypes.BIGINT())
.primaryKey("id")
.build();

@Test
public void testIgnoreApplySchemaChange() throws IOException {
CollectingMetadataApplier metadataApplier =
new CollectingMetadataApplier(Duration.ofSeconds(1));
try (SchemaRegistryRequestHandler schemaRegistryRequestHandler =
newRequestHandler(metadataApplier, SchemaChangeBehavior.IGNORE)) {
schemaRegistryRequestHandler.applySchemaChange(
CUSTOMERS,
Collections.singletonList(new CreateTableEvent(CUSTOMERS, CUSTOMERS_SCHEMA)));
Assertions.assertTrue(CollectionUtils.isEmpty(metadataApplier.getSchemaChangeEvents()));
}
}

@Test
public void testExceptionApplySchemaChange() throws IOException {
CollectingMetadataApplier metadataApplier =
new CollectingMetadataApplier(Duration.ofSeconds(1));

try (SchemaRegistryRequestHandler schemaRegistryRequestHandler =
newRequestHandler(metadataApplier, SchemaChangeBehavior.EXCEPTION)) {
schemaRegistryRequestHandler.applySchemaChange(
CUSTOMERS,
Collections.singletonList(new CreateTableEvent(CUSTOMERS, CUSTOMERS_SCHEMA)));
Assertions.assertTrue(
schemaRegistryRequestHandler.schemaChangeException
instanceof IllegalArgumentException);
}
}

@Test
public void testEvolveApplySchemaChange() throws IOException {
CollectingMetadataApplier metadataApplier =
new CollectingMetadataApplier(Duration.ofSeconds(1));
try (SchemaRegistryRequestHandler schemaRegistryRequestHandler =
newRequestHandler(metadataApplier, SchemaChangeBehavior.EVOLVE)) {
schemaRegistryRequestHandler.applySchemaChange(
CUSTOMERS,
Collections.singletonList(new CreateTableEvent(CUSTOMERS, CUSTOMERS_SCHEMA)));
Assertions.assertTrue(
CollectionUtils.isNotEmpty(metadataApplier.getSchemaChangeEvents()));
}
}

private static SchemaRegistryRequestHandler newRequestHandler(
CollectingMetadataApplier metadataApplier, SchemaChangeBehavior behavior) {
SchemaManager schemaManager = new SchemaManager();
return new SchemaRegistryRequestHandler(
metadataApplier,
schemaManager,
new SchemaDerivation(schemaManager, Collections.emptyList(), new HashMap<>()),
behavior);
}
}