Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,12 @@ public class SchemaRegistry implements OperatorCoordinator, CoordinationRequestH

private SchemaChangeBehavior schemaChangeBehavior;

/**
* Current parallelism. Use this to verify if Schema Registry has collected enough flush success
* events from sink operators.
*/
private int currentParallelism;

public SchemaRegistry(
String operatorName,
OperatorCoordinator.Context context,
Expand Down Expand Up @@ -135,7 +141,9 @@ public SchemaRegistry(
public void start() throws Exception {
LOG.info("Starting SchemaRegistry for {}.", operatorName);
this.failedReasons.clear();
LOG.info("Started SchemaRegistry for {}.", operatorName);
this.currentParallelism = context.currentParallelism();
LOG.info(
"Started SchemaRegistry for {}. Parallelism: {}", operatorName, currentParallelism);
}

@Override
Expand All @@ -155,7 +163,9 @@ public void handleEventFromOperator(int subtask, int attemptNumber, OperatorEven
flushSuccessEvent.getSubtask(),
flushSuccessEvent.getTableId().toString());
requestHandler.flushSuccess(
flushSuccessEvent.getTableId(), flushSuccessEvent.getSubtask());
flushSuccessEvent.getTableId(),
flushSuccessEvent.getSubtask(),
currentParallelism);
} else if (event instanceof SinkWriterRegisterEvent) {
requestHandler.registerSinkWriter(((SinkWriterRegisterEvent) event).getSubtask());
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,11 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -103,8 +103,8 @@ public SchemaRegistryRequestHandler(
this.schemaDerivation = schemaDerivation;
this.schemaChangeBehavior = schemaChangeBehavior;

this.activeSinkWriters = new HashSet<>();
this.flushedSinkWriters = new HashSet<>();
this.activeSinkWriters = ConcurrentHashMap.newKeySet();
this.flushedSinkWriters = ConcurrentHashMap.newKeySet();
this.schemaChangeThreadPool = Executors.newSingleThreadExecutor();

this.currentDerivedSchemaChangeEvents = new ArrayList<>();
Expand All @@ -122,7 +122,7 @@ public CompletableFuture<CoordinationResponse> handleSchemaChangeRequest(
SchemaChangeRequest request) {
if (schemaChangeStatus.compareAndSet(RequestStatus.IDLE, RequestStatus.WAITING_FOR_FLUSH)) {
LOG.info(
"Received schema change event request {} from table {}. Start to buffer requests for others.",
"Received schema change event request {} from table {}. SchemaChangeStatus switched from IDLE to WAITING_FOR_FLUSH, other requests will be blocked.",
request.getSchemaChangeEvent(),
request.getTableId().toString());
SchemaChangeEvent event = request.getSchemaChangeEvent();
Expand All @@ -134,7 +134,11 @@ public CompletableFuture<CoordinationResponse> handleSchemaChangeRequest(
Preconditions.checkState(
schemaChangeStatus.compareAndSet(
RequestStatus.WAITING_FOR_FLUSH, RequestStatus.IDLE),
"Illegal schemaChangeStatus state: should still in WAITING_FOR_FLUSH state if event was duplicated.");
"Illegal schemaChangeStatus state: should still in WAITING_FOR_FLUSH state if event was duplicated, not "
+ schemaChangeStatus.get());
LOG.info(
"SchemaChangeStatus switched from WAITING_FOR_FLUSH to IDLE for request {} due to duplicated request.",
request);
return CompletableFuture.completedFuture(wrap(SchemaChangeResponse.duplicate()));
}
schemaManager.applyOriginalSchemaChange(event);
Expand All @@ -149,7 +153,11 @@ public CompletableFuture<CoordinationResponse> handleSchemaChangeRequest(
Preconditions.checkState(
schemaChangeStatus.compareAndSet(
RequestStatus.WAITING_FOR_FLUSH, RequestStatus.IDLE),
"Illegal schemaChangeStatus state: should still in WAITING_FOR_FLUSH state if event was ignored.");
"Illegal schemaChangeStatus state: should still in WAITING_FOR_FLUSH state if event was ignored, not "
+ schemaChangeStatus.get());
LOG.info(
"SchemaChangeStatus switched from WAITING_FOR_FLUSH to IDLE for request {} due to ignored request.",
request);
return CompletableFuture.completedFuture(wrap(SchemaChangeResponse.ignored()));
}

Expand Down Expand Up @@ -220,7 +228,11 @@ private void applySchemaChange(
}
Preconditions.checkState(
schemaChangeStatus.compareAndSet(RequestStatus.APPLYING, RequestStatus.FINISHED),
"Illegal schemaChangeStatus state: should be APPLYING before applySchemaChange finishes");
"Illegal schemaChangeStatus state: should be APPLYING before applySchemaChange finishes, not "
+ schemaChangeStatus.get());
LOG.info(
"SchemaChangeStatus switched from APPLYING to FINISHED for request {}.",
currentDerivedSchemaChangeEvents);
}

/**
Expand All @@ -239,13 +251,21 @@ public void registerSinkWriter(int sinkSubtask) {
* @param tableId the subtask in SchemaOperator and table that the FlushEvent is about
* @param sinkSubtask the sink subtask succeed flushing
*/
public void flushSuccess(TableId tableId, int sinkSubtask) {
public void flushSuccess(TableId tableId, int sinkSubtask, int parallelism) {
flushedSinkWriters.add(sinkSubtask);
if (activeSinkWriters.size() < parallelism) {
LOG.info(
"Not all active sink writers have been registered. Current {}, expected {}.",
activeSinkWriters.size(),
parallelism);
return;
}
if (flushedSinkWriters.equals(activeSinkWriters)) {
Preconditions.checkState(
schemaChangeStatus.compareAndSet(
RequestStatus.WAITING_FOR_FLUSH, RequestStatus.APPLYING),
"Illegal schemaChangeStatus state: should be WAITING_FOR_FLUSH before collecting enough FlushEvents");
"Illegal schemaChangeStatus state: should be WAITING_FOR_FLUSH before collecting enough FlushEvents, not "
+ schemaChangeStatus);
LOG.info(
"All sink subtask have flushed for table {}. Start to apply schema change.",
tableId.toString());
Expand All @@ -259,6 +279,10 @@ public CompletableFuture<CoordinationResponse> getSchemaChangeResult() {
!schemaChangeStatus.get().equals(RequestStatus.IDLE),
"Illegal schemaChangeStatus: should not be IDLE before getting schema change request results.");
if (schemaChangeStatus.compareAndSet(RequestStatus.FINISHED, RequestStatus.IDLE)) {
LOG.info(
"SchemaChangeStatus switched from FINISHED to IDLE for request {}",
currentDerivedSchemaChangeEvents);

// This request has been finished, return it and prepare for the next request
List<SchemaChangeEvent> finishedEvents = clearCurrentSchemaChangeRequest();
return CompletableFuture.supplyAsync(
Expand Down