Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,20 @@ public class CoordinatorContext {

private final Set<TablePartition> partitionsToBeDeleted = new HashSet<>();

/**
* Tables/partitions whose deletion has failed and are temporarily ineligible for deletion
* retry. A table/partition is marked ineligible when one of its replicas could not be deleted
* (e.g., a TabletServer hosting the replica is offline or stopReplica RPC failed). The mark is
* removed when the relevant TabletServer reconnects, allowing deletion to be retried.
*
* <p>The value stores the most recent reason for being marked ineligible, used for diagnostic
* logging — mirrors the {@code reason} argument of Kafka's {@code
* markTopicIneligibleForDeletion}.
*/
private final Map<Long, String> tablesIneligibleForDeletion = new HashMap<>();

private final Map<TablePartition, String> partitionsIneligibleForDeletion = new HashMap<>();

/**
* A mapping from tablet server to offline buckets. When the leader replica of a table bucket
* become offline, we'll put the entry tablet_server_id -> table_bucket to this map so that we
Expand Down Expand Up @@ -652,8 +666,67 @@ public void queuePartitionDeletion(Set<TablePartition> tablePartitions) {
partitionsToBeDeleted.addAll(tablePartitions);
}

/**
* Mark a table as ineligible for deletion (only effective if the table is queued for deletion).
* Mirrors Kafka's {@code markTopicIneligibleForDeletion}. The {@code reason} is retained for
* diagnostic logging and is overwritten on subsequent marks.
*/
public void markTableIneligibleForDeletion(long tableId, String reason) {
if (tablesToBeDeleted.contains(tableId)) {
String prev = tablesIneligibleForDeletion.put(tableId, reason);
if (prev == null) {
LOG.info("Marking table {} ineligible for deletion. Reason: {}", tableId, reason);
}
}
}

public void markPartitionIneligibleForDeletion(TablePartition tablePartition, String reason) {
if (partitionsToBeDeleted.contains(tablePartition)) {
String prev = partitionsIneligibleForDeletion.put(tablePartition, reason);
if (prev == null) {
LOG.info(
"Marking partition {} ineligible for deletion. Reason: {}",
tablePartition,
reason);
}
}
}

/** Remove the ineligible mark, allowing deletion to be retried. */
public void markTableEligibleForDeletion(long tableId) {
tablesIneligibleForDeletion.remove(tableId);
}

public void markPartitionEligibleForDeletion(TablePartition tablePartition) {
partitionsIneligibleForDeletion.remove(tablePartition);
}

public boolean isTableIneligibleForDeletion(long tableId) {
return tablesIneligibleForDeletion.containsKey(tableId);
}

public boolean isPartitionIneligibleForDeletion(TablePartition tablePartition) {
return partitionsIneligibleForDeletion.containsKey(tablePartition);
}

public Set<TableBucketReplica> getReplicasInState(long tableId, ReplicaState state) {
return getAllReplicasForTable(tableId).stream()
.filter(r -> getReplicaState(r) == state)
.collect(Collectors.toSet());
}

public Set<TableBucketReplica> getReplicasInState(
TablePartition tablePartition, ReplicaState state) {
return getAllReplicasForPartition(
tablePartition.getTableId(), tablePartition.getPartitionId())
.stream()
.filter(r -> getReplicaState(r) == state)
.collect(Collectors.toSet());
}

public void removeTable(long tableId) {
tablesToBeDeleted.remove(tableId);
tablesIneligibleForDeletion.remove(tableId);
Map<Integer, List<Integer>> assignment = tableAssignments.remove(tableId);
if (assignment != null) {
// remove leadership info for each bucket from the context
Expand All @@ -671,6 +744,7 @@ public void removeTable(long tableId) {

public void removePartition(TablePartition tablePartition) {
partitionsToBeDeleted.remove(tablePartition);
partitionsIneligibleForDeletion.remove(tablePartition);
Map<Integer, List<Integer>> assignment = partitionAssignments.remove(tablePartition);
if (assignment != null) {
// remove leadership info for each bucket from the context
Expand Down Expand Up @@ -728,6 +802,9 @@ private void clearTablesState() {

public void resetContext() {
tablesToBeDeleted.clear();
partitionsToBeDeleted.clear();
tablesIneligibleForDeletion.clear();
partitionsIneligibleForDeletion.clear();
clearTablesState();
liveTabletServers.clear();
liveCoordinatorServers.clear();
Expand Down
Loading
Loading