diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistry.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistry.java index 8ea3a1f93b4..617daed92d3 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistry.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistry.java @@ -104,6 +104,12 @@ public class SchemaRegistry implements OperatorCoordinator, CoordinationRequestH private SchemaChangeBehavior schemaChangeBehavior; + /** + * Current parallelism. Use this to verify if Schema Registry has collected enough flush success + * events from sink operators. + */ + private int currentParallelism; + public SchemaRegistry( String operatorName, OperatorCoordinator.Context context, @@ -135,7 +141,9 @@ public SchemaRegistry( public void start() throws Exception { LOG.info("Starting SchemaRegistry for {}.", operatorName); this.failedReasons.clear(); - LOG.info("Started SchemaRegistry for {}.", operatorName); + this.currentParallelism = context.currentParallelism(); + LOG.info( + "Started SchemaRegistry for {}. Parallelism: {}", operatorName, currentParallelism); } @Override @@ -155,7 +163,9 @@ public void handleEventFromOperator(int subtask, int attemptNumber, OperatorEven flushSuccessEvent.getSubtask(), flushSuccessEvent.getTableId().toString()); requestHandler.flushSuccess( - flushSuccessEvent.getTableId(), flushSuccessEvent.getSubtask()); + flushSuccessEvent.getTableId(), + flushSuccessEvent.getSubtask(), + currentParallelism); } else if (event instanceof SinkWriterRegisterEvent) { requestHandler.registerSinkWriter(((SinkWriterRegisterEvent) event).getSubtask()); } else { diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java index 99019a6b407..444fb41d21d 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java @@ -48,11 +48,11 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicReference; @@ -103,8 +103,8 @@ public SchemaRegistryRequestHandler( this.schemaDerivation = schemaDerivation; this.schemaChangeBehavior = schemaChangeBehavior; - this.activeSinkWriters = new HashSet<>(); - this.flushedSinkWriters = new HashSet<>(); + this.activeSinkWriters = ConcurrentHashMap.newKeySet(); + this.flushedSinkWriters = ConcurrentHashMap.newKeySet(); this.schemaChangeThreadPool = Executors.newSingleThreadExecutor(); this.currentDerivedSchemaChangeEvents = new ArrayList<>(); @@ -122,7 +122,7 @@ public CompletableFuture handleSchemaChangeRequest( SchemaChangeRequest request) { if (schemaChangeStatus.compareAndSet(RequestStatus.IDLE, RequestStatus.WAITING_FOR_FLUSH)) { LOG.info( - "Received schema change event request {} from table {}. Start to buffer requests for others.", + "Received schema change event request {} from table {}. SchemaChangeStatus switched from IDLE to WAITING_FOR_FLUSH, other requests will be blocked.", request.getSchemaChangeEvent(), request.getTableId().toString()); SchemaChangeEvent event = request.getSchemaChangeEvent(); @@ -134,7 +134,11 @@ public CompletableFuture handleSchemaChangeRequest( Preconditions.checkState( schemaChangeStatus.compareAndSet( RequestStatus.WAITING_FOR_FLUSH, RequestStatus.IDLE), - "Illegal schemaChangeStatus state: should still in WAITING_FOR_FLUSH state if event was duplicated."); + "Illegal schemaChangeStatus state: should still in WAITING_FOR_FLUSH state if event was duplicated, not " + + schemaChangeStatus.get()); + LOG.info( + "SchemaChangeStatus switched from WAITING_FOR_FLUSH to IDLE for request {} due to duplicated request.", + request); return CompletableFuture.completedFuture(wrap(SchemaChangeResponse.duplicate())); } schemaManager.applyOriginalSchemaChange(event); @@ -149,7 +153,11 @@ public CompletableFuture handleSchemaChangeRequest( Preconditions.checkState( schemaChangeStatus.compareAndSet( RequestStatus.WAITING_FOR_FLUSH, RequestStatus.IDLE), - "Illegal schemaChangeStatus state: should still in WAITING_FOR_FLUSH state if event was ignored."); + "Illegal schemaChangeStatus state: should still in WAITING_FOR_FLUSH state if event was ignored, not " + + schemaChangeStatus.get()); + LOG.info( + "SchemaChangeStatus switched from WAITING_FOR_FLUSH to IDLE for request {} due to ignored request.", + request); return CompletableFuture.completedFuture(wrap(SchemaChangeResponse.ignored())); } @@ -220,7 +228,11 @@ private void applySchemaChange( } Preconditions.checkState( schemaChangeStatus.compareAndSet(RequestStatus.APPLYING, RequestStatus.FINISHED), - "Illegal schemaChangeStatus state: should be APPLYING before applySchemaChange finishes"); + "Illegal schemaChangeStatus state: should be APPLYING before applySchemaChange finishes, not " + + schemaChangeStatus.get()); + LOG.info( + "SchemaChangeStatus switched from APPLYING to FINISHED for request {}.", + currentDerivedSchemaChangeEvents); } /** @@ -239,13 +251,21 @@ public void registerSinkWriter(int sinkSubtask) { * @param tableId the subtask in SchemaOperator and table that the FlushEvent is about * @param sinkSubtask the sink subtask succeed flushing */ - public void flushSuccess(TableId tableId, int sinkSubtask) { + public void flushSuccess(TableId tableId, int sinkSubtask, int parallelism) { flushedSinkWriters.add(sinkSubtask); + if (activeSinkWriters.size() < parallelism) { + LOG.info( + "Not all active sink writers have been registered. Current {}, expected {}.", + activeSinkWriters.size(), + parallelism); + return; + } if (flushedSinkWriters.equals(activeSinkWriters)) { Preconditions.checkState( schemaChangeStatus.compareAndSet( RequestStatus.WAITING_FOR_FLUSH, RequestStatus.APPLYING), - "Illegal schemaChangeStatus state: should be WAITING_FOR_FLUSH before collecting enough FlushEvents"); + "Illegal schemaChangeStatus state: should be WAITING_FOR_FLUSH before collecting enough FlushEvents, not " + + schemaChangeStatus); LOG.info( "All sink subtask have flushed for table {}. Start to apply schema change.", tableId.toString()); @@ -259,6 +279,10 @@ public CompletableFuture getSchemaChangeResult() { !schemaChangeStatus.get().equals(RequestStatus.IDLE), "Illegal schemaChangeStatus: should not be IDLE before getting schema change request results."); if (schemaChangeStatus.compareAndSet(RequestStatus.FINISHED, RequestStatus.IDLE)) { + LOG.info( + "SchemaChangeStatus switched from FINISHED to IDLE for request {}", + currentDerivedSchemaChangeEvents); + // This request has been finished, return it and prepare for the next request List finishedEvents = clearCurrentSchemaChangeRequest(); return CompletableFuture.supplyAsync(