diff --git a/helix-core/src/main/java/org/apache/helix/HelixRebalanceException.java b/helix-core/src/main/java/org/apache/helix/HelixRebalanceException.java index a8b5055fe8..d54853f96c 100644 --- a/helix-core/src/main/java/org/apache/helix/HelixRebalanceException.java +++ b/helix-core/src/main/java/org/apache/helix/HelixRebalanceException.java @@ -23,10 +23,13 @@ * Exception thrown by Helix due to rebalance failures. */ public class HelixRebalanceException extends Exception { + // TODO: Adding static description or other necessary fields into the enum instances for + // TODO: supporting the rebalance monitor to understand the exception. public enum Type { INVALID_CLUSTER_STATUS, INVALID_REBALANCER_STATUS, FAILED_TO_CALCULATE, + INVALID_INPUT, UNKNOWN_FAILURE } diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentMetadataStore.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentMetadataStore.java index fd655d13ad..a540ffbca8 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentMetadataStore.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentMetadataStore.java @@ -20,13 +20,15 @@ */ import java.io.IOException; -import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.Map; + +import com.google.common.annotations.VisibleForTesting; +import org.I0Itec.zkclient.exception.ZkNoNodeException; import org.I0Itec.zkclient.serialize.ZkSerializer; import org.apache.helix.BucketDataAccessor; import org.apache.helix.HelixException; -import org.apache.helix.HelixManager; import org.apache.helix.HelixProperty; import org.apache.helix.ZNRecord; import org.apache.helix.manager.zk.ZNRecordJacksonSerializer; @@ -50,23 +52,27 @@ public class AssignmentMetadataStore { private Map _globalBaseline; private Map _bestPossibleAssignment; + AssignmentMetadataStore(String metadataStoreAddrs, String clusterName) { + this(new ZkBucketDataAccessor(metadataStoreAddrs), clusterName); + } + AssignmentMetadataStore(BucketDataAccessor bucketDataAccessor, String clusterName) { _dataAccessor = bucketDataAccessor; _baselinePath = String.format(BASELINE_TEMPLATE, clusterName, ASSIGNMENT_METADATA_KEY); _bestPossiblePath = String.format(BEST_POSSIBLE_TEMPLATE, clusterName, ASSIGNMENT_METADATA_KEY); } - AssignmentMetadataStore(HelixManager helixManager) { - this(new ZkBucketDataAccessor(helixManager.getMetadataStoreConnectionString()), - helixManager.getClusterName()); - } - public Map getBaseline() { // Return the in-memory baseline. If null, read from ZK. This is to minimize reads from ZK if (_globalBaseline == null) { - HelixProperty baseline = - _dataAccessor.compressedBucketRead(_baselinePath, HelixProperty.class); - _globalBaseline = splitAssignments(baseline); + try { + HelixProperty baseline = + _dataAccessor.compressedBucketRead(_baselinePath, HelixProperty.class); + _globalBaseline = splitAssignments(baseline); + } catch (ZkNoNodeException ex) { + // Metadata does not exist, so return an empty map + _globalBaseline = Collections.emptyMap(); + } } return _globalBaseline; } @@ -74,9 +80,14 @@ public Map getBaseline() { public Map getBestPossibleAssignment() { // Return the in-memory baseline. If null, read from ZK. This is to minimize reads from ZK if (_bestPossibleAssignment == null) { - HelixProperty baseline = - _dataAccessor.compressedBucketRead(_bestPossiblePath, HelixProperty.class); - _bestPossibleAssignment = splitAssignments(baseline); + try { + HelixProperty baseline = + _dataAccessor.compressedBucketRead(_bestPossiblePath, HelixProperty.class); + _bestPossibleAssignment = splitAssignments(baseline); + } catch (ZkNoNodeException ex) { + // Metadata does not exist, so return an empty map + _bestPossibleAssignment = Collections.emptyMap(); + } } return _bestPossibleAssignment; } @@ -113,6 +124,16 @@ public void persistBestPossibleAssignment( _bestPossibleAssignment = bestPossibleAssignment; } + protected void finalize() { + // To ensure all resources are released. + close(); + } + + // Close to release all the resources. + public void close() { + _dataAccessor.disconnect(); + } + /** * Produces one HelixProperty that contains all assignment data. * @param name @@ -123,8 +144,9 @@ private HelixProperty combineAssignments(String name, Map assignmentMap) { HelixProperty property = new HelixProperty(name); // Add each resource's assignment as a simple field in one ZNRecord + // Node that don't use Arrays.toString() for the record converting. The deserialize will fail. assignmentMap.forEach((resource, assignment) -> property.getRecord().setSimpleField(resource, - Arrays.toString(SERIALIZER.serialize(assignment.getRecord())))); + new String(SERIALIZER.serialize(assignment.getRecord())))); return property; } @@ -138,8 +160,8 @@ private Map splitAssignments(HelixProperty property) // Convert each resource's assignment String into a ResourceAssignment object and put it in a // map property.getRecord().getSimpleFields() - .forEach((resource, assignment) -> assignmentMap.put(resource, - new ResourceAssignment((ZNRecord) SERIALIZER.deserialize(assignment.getBytes())))); + .forEach((resource, assignmentStr) -> assignmentMap.put(resource, + new ResourceAssignment((ZNRecord) SERIALIZER.deserialize(assignmentStr.getBytes())))); return assignmentMap; } } diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java index 551239d6ee..1861e107a3 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java @@ -29,6 +29,7 @@ import java.util.stream.Collectors; import org.apache.helix.HelixConstants; +import org.apache.helix.HelixException; import org.apache.helix.HelixManager; import org.apache.helix.HelixRebalanceException; import org.apache.helix.controller.changedetector.ResourceChangeDetector; @@ -64,27 +65,34 @@ public class WagedRebalancer { // When any of the following change happens, the rebalancer needs to do a global rebalance which // contains 1. baseline recalculate, 2. partial rebalance that is based on the new baseline. private static final Set GLOBAL_REBALANCE_REQUIRED_CHANGE_TYPES = - ImmutableSet.of(HelixConstants.ChangeType.RESOURCE_CONFIG, - HelixConstants.ChangeType.CLUSTER_CONFIG, HelixConstants.ChangeType.INSTANCE_CONFIG); + ImmutableSet.of( + HelixConstants.ChangeType.RESOURCE_CONFIG, + HelixConstants.ChangeType.CLUSTER_CONFIG, + HelixConstants.ChangeType.INSTANCE_CONFIG); // The cluster change detector is a stateful object. // Make it static to avoid unnecessary reinitialization. private static final ThreadLocal CHANGE_DETECTOR_THREAD_LOCAL = new ThreadLocal<>(); private final MappingCalculator _mappingCalculator; - - // --------- The following fields are placeholders and need replacement. -----------// - // TODO Shall we make the metadata store a static threadlocal object as well to avoid - // reinitialization? private final AssignmentMetadataStore _assignmentMetadataStore; private final RebalanceAlgorithm _rebalanceAlgorithm; - // ------------------------------------------------------------------------------------// + + private static AssignmentMetadataStore constructAssignmentStore(HelixManager helixManager) { + AssignmentMetadataStore assignmentMetadataStore = null; + if (helixManager != null) { + String metadataStoreAddrs = helixManager.getMetadataStoreConnectionString(); + String clusterName = helixManager.getClusterName(); + if (metadataStoreAddrs != null && clusterName != null) { + assignmentMetadataStore = new AssignmentMetadataStore(metadataStoreAddrs, clusterName); + } + } + return assignmentMetadataStore; + } public WagedRebalancer(HelixManager helixManager, Map preferences) { - this( - // TODO init the metadata store according to their requirement when integrate, - // or change to final static method if possible. - new AssignmentMetadataStore(helixManager), ConstraintBasedAlgorithmFactory.getInstance(preferences), + this(constructAssignmentStore(helixManager), + ConstraintBasedAlgorithmFactory.getInstance(preferences), // Use DelayedAutoRebalancer as the mapping calculator for the final assignment output. // Mapping calculator will translate the best possible assignment into the applicable state // mapping based on the current states. @@ -94,6 +102,10 @@ public WagedRebalancer(HelixManager helixManager, private WagedRebalancer(AssignmentMetadataStore assignmentMetadataStore, RebalanceAlgorithm algorithm, MappingCalculator mappingCalculator) { + if (assignmentMetadataStore == null) { + LOG.warn("Assignment Metadata Store is not configured properly." + + " The rebalancer will not access the assignment store during the rebalance."); + } _assignmentMetadataStore = assignmentMetadataStore; _rebalanceAlgorithm = algorithm; _mappingCalculator = mappingCalculator; @@ -103,7 +115,13 @@ private WagedRebalancer(AssignmentMetadataStore assignmentMetadataStore, protected WagedRebalancer(AssignmentMetadataStore assignmentMetadataStore, RebalanceAlgorithm algorithm) { this(assignmentMetadataStore, algorithm, new DelayedAutoRebalancer()); + } + // Release all the resources. + public void close() { + if (_assignmentMetadataStore != null) { + _assignmentMetadataStore.close(); + } } /** @@ -117,27 +135,18 @@ protected WagedRebalancer(AssignmentMetadataStore assignmentMetadataStore, public Map computeNewIdealStates(ResourceControllerDataProvider clusterData, Map resourceMap, final CurrentStateOutput currentStateOutput) throws HelixRebalanceException { - LOG.info("Start computing new ideal states for resources: {}", resourceMap.keySet().toString()); - - // Find the compatible resources: 1. FULL_AUTO 2. Configured to use the WAGED rebalancer - resourceMap = resourceMap.entrySet().stream().filter(resourceEntry -> { - IdealState is = clusterData.getIdealState(resourceEntry.getKey()); - return is != null && is.getRebalanceMode().equals(IdealState.RebalanceMode.FULL_AUTO) - && getClass().getName().equals(is.getRebalancerClassName()); - }).collect(Collectors.toMap(resourceEntry -> resourceEntry.getKey(), - resourceEntry -> resourceEntry.getValue())); - if (resourceMap.isEmpty()) { - LOG.warn("There is no valid resource to be rebalanced by {}", + LOG.warn("There is no resource to be rebalanced by {}", this.getClass().getSimpleName()); return Collections.emptyMap(); - } else { - LOG.info("Valid resources that will be rebalanced by {}: {}", this.getClass().getSimpleName(), - resourceMap.keySet().toString()); } + LOG.info("Start computing new ideal states for resources: {}", resourceMap.keySet().toString()); + validateInput(clusterData, resourceMap); + // Calculate the target assignment based on the current cluster status. - Map newIdealStates = computeBestPossibleStates(clusterData, resourceMap); + Map newIdealStates = + computeBestPossibleStates(clusterData, resourceMap, currentStateOutput); // Construct the new best possible states according to the current state and target assignment. // Note that the new ideal state might be an intermediate state between the current state and @@ -166,28 +175,29 @@ public Map computeNewIdealStates(ResourceControllerDataProvi // Coordinate baseline recalculation and partial rebalance according to the cluster changes. private Map computeBestPossibleStates( - ResourceControllerDataProvider clusterData, Map resourceMap) - throws HelixRebalanceException { + ResourceControllerDataProvider clusterData, Map resourceMap, + final CurrentStateOutput currentStateOutput) throws HelixRebalanceException { getChangeDetector().updateSnapshots(clusterData); - // Get all the modified and new items' information + // Get all the changed items' information Map> clusterChanges = getChangeDetector().getChangeTypes().stream() .collect(Collectors.toMap(changeType -> changeType, changeType -> { Set itemKeys = new HashSet<>(); itemKeys.addAll(getChangeDetector().getAdditionsByType(changeType)); itemKeys.addAll(getChangeDetector().getChangesByType(changeType)); + itemKeys.addAll(getChangeDetector().getRemovalsByType(changeType)); return itemKeys; })); if (clusterChanges.keySet().stream() .anyMatch(changeType -> GLOBAL_REBALANCE_REQUIRED_CHANGE_TYPES.contains(changeType))) { - refreshBaseline(clusterData, clusterChanges, resourceMap); + refreshBaseline(clusterData, clusterChanges, resourceMap, currentStateOutput); // Inject a cluster config change for large scale partial rebalance once the baseline changed. clusterChanges.putIfAbsent(HelixConstants.ChangeType.CLUSTER_CONFIG, Collections.emptySet()); } Map newAssignment = - partialRebalance(clusterData, clusterChanges, resourceMap); + partialRebalance(clusterData, clusterChanges, resourceMap, currentStateOutput); // Convert the assignments into IdealState for the following state mapping calculation. Map finalIdealState = new HashMap<>(); @@ -213,56 +223,60 @@ private Map computeBestPossibleStates( // TODO make the Baseline calculation async if complicated algorithm is used for the Baseline private void refreshBaseline(ResourceControllerDataProvider clusterData, - Map> clusterChanges, Map resourceMap) - throws HelixRebalanceException { + Map> clusterChanges, Map resourceMap, + final CurrentStateOutput currentStateOutput) throws HelixRebalanceException { + LOG.info("Start calculating the new baseline."); + Map currentBaseline = + getBaselineAssignment(_assignmentMetadataStore, currentStateOutput, resourceMap.keySet()); // For baseline calculation // 1. Ignore node status (disable/offline). // 2. Use the baseline as the previous best possible assignment since there is no "baseline" for // the baseline. - LOG.info("Start calculating the new baseline."); - Map currentBaseline; - try { - currentBaseline = _assignmentMetadataStore.getBaseline(); - } catch (Exception ex) { - throw new HelixRebalanceException("Failed to get the current baseline assignment.", - HelixRebalanceException.Type.INVALID_REBALANCER_STATUS, ex); - } - Map baseline = calculateAssignment(clusterData, clusterChanges, - resourceMap, clusterData.getAllInstances(), Collections.emptyMap(), currentBaseline); - try { - _assignmentMetadataStore.persistBaseline(baseline); - } catch (Exception ex) { - throw new HelixRebalanceException("Failed to persist the new baseline assignment.", - HelixRebalanceException.Type.INVALID_REBALANCER_STATUS, ex); + Map newBaseline = + calculateAssignment(clusterData, clusterChanges, resourceMap, clusterData.getAllInstances(), + Collections.emptyMap(), currentBaseline); + + if (_assignmentMetadataStore != null) { + try { + _assignmentMetadataStore.persistBaseline(newBaseline); + } catch (Exception ex) { + throw new HelixRebalanceException("Failed to persist the new baseline assignment.", + HelixRebalanceException.Type.INVALID_REBALANCER_STATUS, ex); + } + } else { + LOG.debug("Assignment Metadata Store is empty. Skip persist the baseline assignment."); } + LOG.info("Finish calculating the new baseline."); } private Map partialRebalance( ResourceControllerDataProvider clusterData, - Map> clusterChanges, Map resourceMap) - throws HelixRebalanceException { + Map> clusterChanges, Map resourceMap, + final CurrentStateOutput currentStateOutput) throws HelixRebalanceException { LOG.info("Start calculating the new best possible assignment."); - Set activeInstances = clusterData.getEnabledLiveInstances(); - Map baseline; - Map prevBestPossibleAssignment; - try { - baseline = _assignmentMetadataStore.getBaseline(); - prevBestPossibleAssignment = _assignmentMetadataStore.getBestPossibleAssignment(); - } catch (Exception ex) { - throw new HelixRebalanceException("Failed to get the persisted assignment records.", - HelixRebalanceException.Type.INVALID_REBALANCER_STATUS, ex); - } - Map newAssignment = calculateAssignment(clusterData, clusterChanges, - resourceMap, activeInstances, baseline, prevBestPossibleAssignment); - try { - // TODO Test to confirm if persisting the final assignment (with final partition states) - // would be a better option. - _assignmentMetadataStore.persistBestPossibleAssignment(newAssignment); - } catch (Exception ex) { - throw new HelixRebalanceException("Failed to persist the new best possible assignment.", - HelixRebalanceException.Type.INVALID_REBALANCER_STATUS, ex); + Map currentBaseline = + getBaselineAssignment(_assignmentMetadataStore, currentStateOutput, resourceMap.keySet()); + Map currentBestPossibleAssignment = + getBestPossibleAssignment(_assignmentMetadataStore, currentStateOutput, + resourceMap.keySet()); + Map newAssignment = + calculateAssignment(clusterData, clusterChanges, resourceMap, + clusterData.getEnabledLiveInstances(), currentBaseline, currentBestPossibleAssignment); + + if (_assignmentMetadataStore != null) { + try { + // TODO Test to confirm if persisting the final assignment (with final partition states) + // would be a better option. + _assignmentMetadataStore.persistBestPossibleAssignment(newAssignment); + } catch (Exception ex) { + throw new HelixRebalanceException("Failed to persist the new best possible assignment.", + HelixRebalanceException.Type.INVALID_REBALANCER_STATUS, ex); + } + } else { + LOG.debug("Assignment Metadata Store is empty. Skip persist the baseline assignment."); } + LOG.info("Finish calculating the new best possible assignment."); return newAssignment; } @@ -348,4 +362,100 @@ private Map> getPreferenceLists(ResourceAssignment newAssig } return preferenceList; } + + private void validateInput(ResourceControllerDataProvider clusterData, + Map resourceMap) throws HelixRebalanceException { + Set nonCompatibleResources = resourceMap.entrySet().stream().filter(resourceEntry -> { + IdealState is = clusterData.getIdealState(resourceEntry.getKey()); + return is == null || !is.getRebalanceMode().equals(IdealState.RebalanceMode.FULL_AUTO) + || !getClass().getName().equals(is.getRebalancerClassName()); + }).map(Map.Entry::getKey).collect(Collectors.toSet()); + if (!nonCompatibleResources.isEmpty()) { + throw new HelixRebalanceException(String.format( + "Input contains invalid resource(s) that cannot be rebalanced by the WAGED rebalancer. %s", + nonCompatibleResources.toString()), HelixRebalanceException.Type.INVALID_INPUT); + } + } + + /** + * @param assignmentMetadataStore + * @param currentStateOutput + * @param resources + * @return The current baseline assignment. If record does not exist in the + * assignmentMetadataStore, return the current state assignment. + * @throws HelixRebalanceException + */ + private Map getBaselineAssignment( + AssignmentMetadataStore assignmentMetadataStore, CurrentStateOutput currentStateOutput, + Set resources) throws HelixRebalanceException { + Map currentBaseline = Collections.emptyMap(); + if (assignmentMetadataStore != null) { + try { + currentBaseline = assignmentMetadataStore.getBaseline(); + } catch (HelixException ex) { + // Report error. and use empty mapping instead. + LOG.error("Failed to get the current baseline assignment.", ex); + } catch (Exception ex) { + throw new HelixRebalanceException( + "Failed to get the current baseline assignment because of unexpected error.", + HelixRebalanceException.Type.INVALID_REBALANCER_STATUS, ex); + } + } + if (currentBaseline.isEmpty()) { + LOG.warn( + "The current baseline assignment record is empty. Use the current states instead."); + currentBaseline = getCurrentStateAssingment(currentStateOutput, resources); + } + return currentBaseline; + } + + /** + * @param assignmentMetadataStore + * @param currentStateOutput + * @param resources + * @return The current best possible assignment. If record does not exist in the + * assignmentMetadataStore, return the current state assignment. + * @throws HelixRebalanceException + */ + private Map getBestPossibleAssignment( + AssignmentMetadataStore assignmentMetadataStore, CurrentStateOutput currentStateOutput, + Set resources) throws HelixRebalanceException { + Map currentBestAssignment = Collections.emptyMap(); + if (assignmentMetadataStore != null) { + try { + currentBestAssignment = assignmentMetadataStore.getBestPossibleAssignment(); + } catch (HelixException ex) { + // Report error. and use empty mapping instead. + LOG.error("Failed to get the current best possible assignment.", ex); + } catch (Exception ex) { + throw new HelixRebalanceException( + "Failed to get the current best possible assignment because of unexpected error.", + HelixRebalanceException.Type.INVALID_REBALANCER_STATUS, ex); + } + } + if (currentBestAssignment.isEmpty()) { + LOG.warn( + "The current best possible assignment record is empty. Use the current states instead."); + currentBestAssignment = getCurrentStateAssingment(currentStateOutput, resources); + } + return currentBestAssignment; + } + + private Map getCurrentStateAssingment( + CurrentStateOutput currentStateOutput, Set resourceSet) { + Map currentStateAssignment = new HashMap<>(); + for (String resourceName : resourceSet) { + Map> currentStateMap = + currentStateOutput.getCurrentStateMap(resourceName); + if (!currentStateMap.isEmpty()) { + ResourceAssignment newResourceAssignment = new ResourceAssignment(resourceName); + currentStateMap.entrySet().stream().forEach(currentStateEntry -> { + newResourceAssignment + .addReplicaMap(currentStateEntry.getKey(), currentStateEntry.getValue()); + }); + currentStateAssignment.put(resourceName, newResourceAssignment); + } + } + return currentStateAssignment; + } } diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithm.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithm.java index 89a3f29662..1a41aef501 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithm.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithm.java @@ -29,6 +29,7 @@ import java.util.function.Function; import java.util.stream.Collectors; +import com.google.common.collect.Maps; import org.apache.helix.HelixRebalanceException; import org.apache.helix.controller.rebalancer.waged.RebalanceAlgorithm; import org.apache.helix.controller.rebalancer.waged.model.AssignableNode; @@ -36,11 +37,10 @@ import org.apache.helix.controller.rebalancer.waged.model.ClusterContext; import org.apache.helix.controller.rebalancer.waged.model.ClusterModel; import org.apache.helix.controller.rebalancer.waged.model.OptimalAssignment; +import org.apache.helix.model.ResourceAssignment; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.collect.Maps; - /** * The algorithm is based on a given set of constraints * - HardConstraint: Approve or deny the assignment given its condition, any assignment cannot @@ -64,29 +64,26 @@ class ConstraintBasedAlgorithm implements RebalanceAlgorithm { @Override public OptimalAssignment calculate(ClusterModel clusterModel) throws HelixRebalanceException { OptimalAssignment optimalAssignment = new OptimalAssignment(); - Map> replicasByResource = clusterModel.getAssignableReplicaMap(); List nodes = new ArrayList<>(clusterModel.getAssignableNodes().values()); - - // TODO: different orders of resource/replica could lead to different greedy assignments, will - // revisit and improve the performance - for (String resource : replicasByResource.keySet()) { - for (AssignableReplica replica : replicasByResource.get(resource)) { - Optional maybeBestNode = - getNodeWithHighestPoints(replica, nodes, clusterModel.getContext(), optimalAssignment); - // stop immediately if any replica cannot find best assignable node - if (optimalAssignment.hasAnyFailure()) { - String errorMessage = String.format( - "Unable to find any available candidate node for partition %s; Fail reasons: %s", - replica.getPartitionName(), optimalAssignment.getFailures()); - throw new HelixRebalanceException(errorMessage, - HelixRebalanceException.Type.FAILED_TO_CALCULATE); - } - maybeBestNode.ifPresent(node -> clusterModel.assign(replica.getResourceName(), - replica.getPartitionName(), replica.getReplicaState(), node.getInstanceName())); + // Sort the replicas so the input is stable for the greedy algorithm. + // For the other algorithm implementation, this sorting could be unnecessary. + for (AssignableReplica replica : getOrderedAssignableReplica(clusterModel)) { + Optional maybeBestNode = + getNodeWithHighestPoints(replica, nodes, clusterModel.getContext(), optimalAssignment); + // stop immediately if any replica cannot find best assignable node + if (optimalAssignment.hasAnyFailure()) { + String errorMessage = String.format( + "Unable to find any available candidate node for partition %s; Fail reasons: %s", + replica.getPartitionName(), optimalAssignment.getFailures()); + throw new HelixRebalanceException(errorMessage, + HelixRebalanceException.Type.FAILED_TO_CALCULATE); } + maybeBestNode.ifPresent(node -> clusterModel + .assign(replica.getResourceName(), replica.getPartitionName(), replica.getReplicaState(), + node.getInstanceName())); } - - return optimalAssignment.convertFrom(clusterModel); + optimalAssignment.updateAssignments(clusterModel); + return optimalAssignment; } private Optional getNodeWithHighestPoints(AssignableReplica replica, @@ -133,4 +130,55 @@ private List convertFailureReasons(List hardConstraints) return hardConstraints.stream().map(HardConstraint::getDescription) .collect(Collectors.toList()); } + + // TODO investigate better ways to sort replicas. One option is sorting based on the creation time. + private List getOrderedAssignableReplica(ClusterModel clusterModel) { + Map> replicasByResource = clusterModel.getAssignableReplicaMap(); + List orderedAssignableReplicas = + replicasByResource.values().stream().flatMap(replicas -> replicas.stream()) + .collect(Collectors.toList()); + + Map bestPossibleAssignment = + clusterModel.getContext().getBestPossibleAssignment(); + Map baselineAssignment = + clusterModel.getContext().getBaselineAssignment(); + + // 1. Sort according if the assignment exists in the best possible and/or baseline assignment + // 2. Sort according to the state priority. Note that prioritizing the top state is required. + // Or the greedy algorithm will unnecessarily shuffle the states between replicas. + // 3. Sort according to the resource/partition name. + orderedAssignableReplicas.sort((replica1, replica2) -> { + String resourceName1 = replica1.getResourceName(); + String resourceName2 = replica2.getResourceName(); + if (bestPossibleAssignment.containsKey(resourceName1) == bestPossibleAssignment + .containsKey(resourceName2)) { + if (baselineAssignment.containsKey(resourceName1) == baselineAssignment + .containsKey(resourceName2)) { + // If both assignment states have/not have the resource assignment the same, + // compare for additional dimensions. + int statePriority1 = replica1.getStatePriority(); + int statePriority2 = replica2.getStatePriority(); + if (statePriority1 == statePriority2) { + // If state prioritizes are the same, compare the names. + if (resourceName1.equals(resourceName2)) { + return replica1.getPartitionName().compareTo(replica2.getPartitionName()); + } else { + return resourceName1.compareTo(resourceName2); + } + } else { + // Note we shall prioritize the replica with a higher state priority, + // the smaller priority number means higher priority. + return statePriority1 - statePriority2; + } + } else { + // If the baseline assignment contains the assignment, prioritize the replica. + return baselineAssignment.containsKey(resourceName1) ? -1 : 1; + } + } else { + // If the best possible assignment contains the assignment, prioritize the replica. + return bestPossibleAssignment.containsKey(resourceName1) ? -1 : 1; + } + }); + return orderedAssignableReplicas; + } } diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/NodeMaxPartitionLimitConstraint.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/NodeMaxPartitionLimitConstraint.java index 9d0752be6c..cda53292f3 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/NodeMaxPartitionLimitConstraint.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/NodeMaxPartitionLimitConstraint.java @@ -28,9 +28,12 @@ class NodeMaxPartitionLimitConstraint extends HardConstraint { @Override boolean isAssignmentValid(AssignableNode node, AssignableReplica replica, ClusterContext clusterContext) { - return node.getAssignedReplicaCount() < node.getMaxPartition() - && node.getAssignedPartitionsByResource(replica.getResourceName()).size() < replica - .getResourceMaxPartitionsPerInstance(); + boolean exceedMaxPartitionLimit = + node.getMaxPartition() < 0 || node.getAssignedReplicaCount() < node.getMaxPartition(); + boolean exceedResourceMaxPartitionLimit = replica.getResourceMaxPartitionsPerInstance() < 0 + || node.getAssignedPartitionsByResource(replica.getResourceName()).size() < replica + .getResourceMaxPartitionsPerInstance(); + return exceedMaxPartitionLimit && exceedResourceMaxPartitionLimit; } @Override diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java index a3460fb5cb..20de6da401 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java @@ -19,8 +19,6 @@ * under the License. */ -import static java.lang.Math.max; - import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -287,16 +285,23 @@ public int getMaxPartition() { * Any missing field will cause an invalid topology config exception. */ private String computeFaultZone(ClusterConfig clusterConfig, InstanceConfig instanceConfig) { - if (clusterConfig.isTopologyAwareEnabled()) { - String topologyStr = clusterConfig.getTopology(); - String faultZoneType = clusterConfig.getFaultZoneType(); - if (topologyStr == null || faultZoneType == null) { - throw new HelixException("Fault zone or cluster topology information is not configured."); - } - + if (!clusterConfig.isTopologyAwareEnabled()) { + // Instance name is the default fault zone if topology awareness is false. + return instanceConfig.getInstanceName(); + } + String topologyStr = clusterConfig.getTopology(); + String faultZoneType = clusterConfig.getFaultZoneType(); + if (topologyStr == null || faultZoneType == null) { + LOG.debug("Topology configuration is not complete. Topology define: {}, Fault Zone Type: {}", + topologyStr, faultZoneType); + // Use the instance name, or the deprecated ZoneId field (if exists) as the default fault zone. + String zoneId = instanceConfig.getZoneId(); + return zoneId == null ? instanceConfig.getInstanceName() : zoneId; + } else { + // Get the fault zone information from the complete topology definition. String[] topologyDef = topologyStr.trim().split("/"); - if (topologyDef.length == 0 - || Arrays.stream(topologyDef).noneMatch(type -> type.equals(faultZoneType))) { + if (topologyDef.length == 0 || + Arrays.stream(topologyDef).noneMatch(type -> type.equals(faultZoneType))) { throw new HelixException( "The configured topology definition is empty or does not contain the fault zone type."); } @@ -324,10 +329,6 @@ private String computeFaultZone(ClusterConfig clusterConfig, InstanceConfig inst } return faultZoneStringBuilder.toString(); } - } else { - // For backward compatibility - String zoneId = instanceConfig.getZoneId(); - return zoneId == null ? instanceConfig.getInstanceName() : zoneId; } } @@ -356,7 +357,7 @@ private void updateCapacityAndUtilization(String capacityKey, int valueToSubtrac // For the purpose of constraint calculation, the max utilization cannot be larger than 100%. float utilization = Math.min( (float) (_maxCapacity.get(capacityKey) - newCapacity) / _maxCapacity.get(capacityKey), 1); - _highestCapacityUtilization = max(_highestCapacityUtilization, utilization); + _highestCapacityUtilization = Math.max(_highestCapacityUtilization, utilization); } // else if the capacityKey does not exist in the capacity map, this method essentially becomes // a NOP; in other words, this node will be treated as if it has unlimited capacity. @@ -394,4 +395,9 @@ public int hashCode() { public int compareTo(AssignableNode o) { return _instanceName.compareTo(o.getInstanceName()); } + + @Override + public String toString() { + return _instanceName; + } } diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableReplica.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableReplica.java index 66bd7b774e..a651e1924d 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableReplica.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableReplica.java @@ -20,17 +20,22 @@ */ import java.io.IOException; +import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.helix.model.ClusterConfig; import org.apache.helix.model.ResourceConfig; import org.apache.helix.model.StateModelDefinition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * This class represents a partition replication that needs to be allocated. */ public class AssignableReplica implements Comparable { + private static final Logger LOG = LoggerFactory.getLogger(AssignableReplica.class); + private final String _partitionName; private final String _resourceName; private final String _resourceInstanceGroupTag; @@ -149,9 +154,10 @@ private Map fetchCapacityUsage(String partitionName, partitionCapacity = capacityMap.get(ResourceConfig.DEFAULT_PARTITION_KEY); } if (partitionCapacity == null) { - throw new IllegalArgumentException(String.format( - "The capacity usage of the specified partition %s is not configured in the Resource Config %s. No default partition capacity is configured neither.", - partitionName, resourceConfig.getResourceName())); + LOG.warn("The capacity usage of the specified partition {} is not configured in the Resource" + + " Config {}. No default partition capacity is configured either. Will proceed with" + + " empty capacity configuration.", partitionName, resourceConfig.getResourceName()); + partitionCapacity = new HashMap<>(); } List requiredCapacityKeys = clusterConfig.getInstanceCapacityKeys(); diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java index af1a8d8d02..2b534220d3 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java @@ -200,6 +200,9 @@ private static Map> parseAllReplicas( for (String resourceName : resourceMap.keySet()) { ResourceConfig resourceConfig = dataProvider.getResourceConfig(resourceName); + if (resourceConfig == null) { + resourceConfig = new ResourceConfig(resourceName); + } IdealState is = dataProvider.getIdealState(resourceName); if (is == null) { throw new HelixException( @@ -223,6 +226,7 @@ private static Map> parseAllReplicas( for (Map.Entry entry : stateCountMap.entrySet()) { String state = entry.getKey(); for (int i = 0; i < entry.getValue(); i++) { + mergeIdealStateWithResourceConfig(resourceConfig, is); totalReplicaMap.computeIfAbsent(resourceName, key -> new HashSet<>()).add( new AssignableReplica(clusterConfig, resourceConfig, partition, state, def.getStatePriorityMap().get(state))); @@ -233,6 +237,32 @@ private static Map> parseAllReplicas( return totalReplicaMap; } + /** + * For backward compatibility, propagate the critical simple fields from the IdealState to + * the Resource Config. + * Eventually, Resource Config should be the only metadata node that contains the required information. + */ + private static void mergeIdealStateWithResourceConfig(ResourceConfig resourceConfig, + final IdealState idealState) { + // Note that the config fields get updated in this method shall be fully compatible with ones in the IdealState. + // 1. The fields shall have exactly the same meaning. + // 2. The value shall be exactly compatible, no additional calculation involved. + // 3. Resource Config items have a high priority. + // This is to ensure the resource config is not polluted after the merge. + if (null == resourceConfig.getRecord() + .getSimpleField(ResourceConfig.ResourceConfigProperty.INSTANCE_GROUP_TAG.name())) { + resourceConfig.getRecord() + .setSimpleField(ResourceConfig.ResourceConfigProperty.INSTANCE_GROUP_TAG.name(), + idealState.getInstanceGroupTag()); + } + if (null == resourceConfig.getRecord() + .getSimpleField(ResourceConfig.ResourceConfigProperty.MAX_PARTITIONS_PER_INSTANCE.name())) { + resourceConfig.getRecord() + .setIntField(ResourceConfig.ResourceConfigProperty.MAX_PARTITIONS_PER_INSTANCE.name(), + idealState.getMaxPartitionsPerInstance()); + } + } + /** * @return A map contains the assignments for each fault zone. > */ diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/OptimalAssignment.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/OptimalAssignment.java index 31cb1818b2..138f30c58e 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/OptimalAssignment.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/OptimalAssignment.java @@ -19,38 +19,64 @@ * under the License. */ +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.helix.HelixException; +import org.apache.helix.model.Partition; import org.apache.helix.model.ResourceAssignment; /** * The data model represents the optimal assignment of N replicas assigned to M instances; * It's mostly used as the return parameter of an assignment calculation algorithm; If the algorithm - * failed to find optimal assignment given the endeavor, the user could check the failure reasons + * failed to find optimal assignment given the endeavor, the user could check the failure reasons. + * Note that this class is not thread safe. */ public class OptimalAssignment { private Map> _optimalAssignment = new HashMap<>(); private Map>> _failedAssignments = new HashMap<>(); - public OptimalAssignment() { - - } - + /** + * Update the OptimalAssignment instance with the existing assignment recorded in the input cluster model. + * + * @param clusterModel + */ public void updateAssignments(ClusterModel clusterModel) { - + _optimalAssignment.clear(); + clusterModel.getAssignableNodes().values().stream() + .forEach(node -> _optimalAssignment.put(node, new ArrayList<>(node.getAssignedReplicas()))); } - // TODO: determine the output of final assignment format + /** + * @return The optimal assignment in the form of a map. + */ public Map getOptimalResourceAssignment() { - throw new UnsupportedOperationException("Not implemented yet"); - } - - // TODO: the convert method is not the best choice so far, will revisit the data model - public OptimalAssignment convertFrom(ClusterModel clusterModel) { - return this; + if (hasAnyFailure()) { + throw new HelixException( + "Cannot get the optimal resource assignment since a calculation failure is recorded. " + + getFailures()); + } + Map assignmentMap = new HashMap<>(); + for (AssignableNode node : _optimalAssignment.keySet()) { + for (AssignableReplica replica : _optimalAssignment.get(node)) { + String resourceName = replica.getResourceName(); + Partition partition = new Partition(replica.getPartitionName()); + ResourceAssignment resourceAssignment = assignmentMap + .computeIfAbsent(resourceName, key -> new ResourceAssignment(resourceName)); + Map partitionStateMap = resourceAssignment.getReplicaMap(partition); + if (partitionStateMap.isEmpty()) { + // ResourceAssignment returns immutable empty map while no such assignment recorded yet. + // So if the returned map is empty, create a new map. + partitionStateMap = new HashMap<>(); + } + partitionStateMap.put(node.getInstanceName(), replica.getReplicaState()); + resourceAssignment.addReplicaMap(partition, partitionStateMap); + } + } + return assignmentMap; } public void recordAssignmentFailure(AssignableReplica replica, diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java index 4df8e8dde2..8ce60e702f 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java @@ -19,6 +19,14 @@ * under the License. */ +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.stream.Collectors; + import org.apache.helix.HelixException; import org.apache.helix.HelixManager; import org.apache.helix.HelixRebalanceException; @@ -48,13 +56,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.concurrent.Callable; - /** * For partition compute best possible (instance,state) pair based on * IdealState,StateModel,LiveInstance @@ -114,67 +115,46 @@ private BestPossibleStateOutput compute(ClusterEvent event, Map newIdealStates = new HashMap<>(); - Map preferences = cache.getClusterConfig() - .getGlobalRebalancePreference(); - WagedRebalancer wagedRebalancer = new WagedRebalancer(helixManager, preferences); - try { - newIdealStates - .putAll(wagedRebalancer.computeNewIdealStates(cache, resourceMap, currentStateOutput)); - } catch (HelixRebalanceException ex) { - // Note that unlike the legacy rebalancer, the WAGED rebalance won't return partial result. - // Since it calculates for all the eligible resources globally, a partial result is invalid. - // TODO propagate the rebalancer failure information to updateRebalanceStatus for monitoring. - LogUtil.logError(logger, _eventId, String - .format("Failed to calculate the new Ideal States using the rebalancer %s due to %s", - wagedRebalancer.getClass().getSimpleName(), ex.getFailureType()), ex); - } + boolean isValid = + validateOfflineInstancesLimit(cache, event.getAttribute(AttributeName.helixmanager.name())); final List failureResources = new ArrayList<>(); - Iterator itr = resourceMap.values().iterator(); + + Map calculatedResourceMap = + computeResourceBestPossibleStateWithWagedRebalancer(cache, currentStateOutput, helixManager, + resourceMap, output, failureResources); + + Map remainingResourceMap = new HashMap<>(resourceMap); + remainingResourceMap.keySet().removeAll(calculatedResourceMap.keySet()); + + // Fallback to the original single resource rebalancer calculation. + // This is required because we support mixed cluster that uses both WAGED rebalancer and the + // older rebalancers. + Iterator itr = remainingResourceMap.values().iterator(); while (itr.hasNext()) { Resource resource = itr.next(); boolean result = false; - IdealState is = newIdealStates.get(resource.getResourceName()); - if (is != null) { - // 2. Check if the WAGED rebalancer has calculated for this resource or not. - result = checkBestPossibleStateCalculation(is); - if (result) { - // The WAGED rebalancer calculates a valid result, record in the output - updateBestPossibleStateOutput(output, resource, is); - } - } else { - // 3. The WAGED rebalancer skips calculating the resource assignment, fallback to use a - // legacy resource rebalancer if applicable. - // If this calculation fails, the resource will be reported in the failureResources list. - try { - result = - computeSingleResourceBestPossibleState(event, cache, currentStateOutput, resource, - output); - } catch (HelixException ex) { - LogUtil.logError(logger, _eventId, - "Exception when calculating best possible states for " + resource.getResourceName(), - ex); - } + try { + result = computeSingleResourceBestPossibleState(event, cache, currentStateOutput, resource, + output); + } catch (HelixException ex) { + LogUtil.logError(logger, _eventId, String + .format("Exception when calculating best possible states for %s", + resource.getResourceName()), ex); + } if (!result) { failureResources.add(resource.getResourceName()); - LogUtil.logWarn(logger, _eventId, - "Failed to calculate best possible states for " + resource.getResourceName()); + LogUtil.logWarn(logger, _eventId, String + .format("Failed to calculate best possible states for %s", resource.getResourceName())); } } // Check and report if resource rebalance has failure updateRebalanceStatus(!isValid || !failureResources.isEmpty(), failureResources, helixManager, - cache, clusterStatusMonitor, - "Failed to calculate best possible states for " + failureResources.size() + " resources."); + cache, clusterStatusMonitor, String + .format("Failed to calculate best possible states for %d resources.", + failureResources.size())); return output; } @@ -238,6 +218,70 @@ private boolean validateOfflineInstancesLimit(final ResourceControllerDataProvid return true; } + /** + * Rebalance with the WAGED rebalancer + * The rebalancer only calculates the new ideal assignment for all the resources that are + * configured to use the WAGED rebalancer. + * + * @param cache Cluster data cache. + * @param currentStateOutput The current state information. + * @param helixManager + * @param resourceMap The complete resource map. The method will filter the map for the compatible resources. + * @param output The best possible state output. + * @param failureResources The failure records that will be updated if any resource cannot be computed. + * @return The map of all the calculated resources. + */ + private Map computeResourceBestPossibleStateWithWagedRebalancer( + ResourceControllerDataProvider cache, CurrentStateOutput currentStateOutput, + HelixManager helixManager, Map resourceMap, BestPossibleStateOutput output, + List failureResources) { + // Find the compatible resources: 1. FULL_AUTO 2. Configured to use the WAGED rebalancer + Map wagedRebalancedResourceMap = + resourceMap.entrySet().stream().filter(resourceEntry -> { + IdealState is = cache.getIdealState(resourceEntry.getKey()); + return is != null && is.getRebalanceMode().equals(IdealState.RebalanceMode.FULL_AUTO) + && WagedRebalancer.class.getName().equals(is.getRebalancerClassName()); + }).collect(Collectors.toMap(resourceEntry -> resourceEntry.getKey(), + resourceEntry -> resourceEntry.getValue())); + + Map newIdealStates = new HashMap<>(); + + // Init rebalancer with the rebalance preferences. + Map preferences = cache.getClusterConfig() + .getGlobalRebalancePreference(); + // TODO avoid creating the rebalancer on every rebalance call for performance enhancement + WagedRebalancer wagedRebalancer = new WagedRebalancer(helixManager, preferences); + try { + newIdealStates.putAll(wagedRebalancer + .computeNewIdealStates(cache, wagedRebalancedResourceMap, currentStateOutput)); + } catch (HelixRebalanceException ex) { + // Note that unlike the legacy rebalancer, the WAGED rebalance won't return partial result. + // Since it calculates for all the eligible resources globally, a partial result is invalid. + // TODO propagate the rebalancer failure information to updateRebalanceStatus for monitoring. + LogUtil.logError(logger, _eventId, String + .format("Failed to calculate the new Ideal States using the rebalancer %s due to %s", + wagedRebalancer.getClass().getSimpleName(), ex.getFailureType()), ex); + } finally { + wagedRebalancer.close(); + } + Iterator itr = wagedRebalancedResourceMap.values().iterator(); + while (itr.hasNext()) { + Resource resource = itr.next(); + IdealState is = newIdealStates.get(resource.getResourceName()); + // Check if the WAGED rebalancer has calculated the result for this resource or not. + if (is != null && checkBestPossibleStateCalculation(is)) { + // The WAGED rebalancer calculates a valid result, record in the output + updateBestPossibleStateOutput(output, resource, is); + } else { + failureResources.add(resource.getResourceName()); + LogUtil.logWarn(logger, _eventId, String + .format("Failed to calculate best possible states for %s.", + resource.getResourceName())); + } + } + return wagedRebalancedResourceMap; + } + private void updateBestPossibleStateOutput(BestPossibleStateOutput output, Resource resource, IdealState computedIdealState) { output.setPreferenceLists(resource.getResourceName(), computedIdealState.getPreferenceLists()); diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java index 24c7c8e93b..a11da29f0e 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java @@ -25,6 +25,7 @@ import java.util.Arrays; import java.util.List; import org.I0Itec.zkclient.exception.ZkMarshallingError; +import org.I0Itec.zkclient.exception.ZkNoNodeException; import org.I0Itec.zkclient.serialize.ZkSerializer; import org.apache.helix.AccessOption; import org.apache.helix.BaseDataAccessor; @@ -215,7 +216,7 @@ private HelixProperty compressedBucketRead(String path) { ZNRecord metadataRecord = _znRecordBaseDataAccessor.get(path, null, AccessOption.PERSISTENT); if (metadataRecord == null) { - throw new HelixException( + throw new ZkNoNodeException( String.format("Metadata ZNRecord does not exist for path: %s", path)); } diff --git a/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java b/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java index 03338b4894..b9284b9a23 100644 --- a/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java +++ b/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java @@ -30,9 +30,9 @@ import java.util.Map; import java.util.Set; import java.util.logging.Level; - import javax.management.MBeanServerConnection; import javax.management.ObjectName; + import org.I0Itec.zkclient.IZkStateListener; import org.I0Itec.zkclient.ZkServer; import org.apache.helix.BaseDataAccessor; @@ -54,6 +54,7 @@ import org.apache.helix.controller.pipeline.StageContext; import org.apache.helix.controller.rebalancer.DelayedAutoRebalancer; import org.apache.helix.controller.rebalancer.strategy.AutoRebalanceStrategy; +import org.apache.helix.controller.rebalancer.waged.WagedRebalancer; import org.apache.helix.controller.stages.AttributeName; import org.apache.helix.controller.stages.ClusterEvent; import org.apache.helix.manager.zk.ZKHelixAdmin; @@ -347,6 +348,19 @@ protected IdealState createResourceWithDelayedRebalance(String clusterName, Stri protected IdealState createResourceWithDelayedRebalance(String clusterName, String db, String stateModel, int numPartition, int replica, int minActiveReplica, long delay, String rebalanceStrategy) { + return createResource(clusterName, db, stateModel, numPartition, replica, minActiveReplica, + delay, DelayedAutoRebalancer.class.getName(), rebalanceStrategy); + } + + protected IdealState createResourceWithWagedRebalance(String clusterName, String db, + String stateModel, int numPartition, int replica, int minActiveReplica, long delay) { + return createResource(clusterName, db, stateModel, numPartition, replica, minActiveReplica, + delay, WagedRebalancer.class.getName(), null); + } + + private IdealState createResource(String clusterName, String db, String stateModel, + int numPartition, int replica, int minActiveReplica, long delay, String rebalancerClassName, + String rebalanceStrategy) { IdealState idealState = _gSetupTool.getClusterManagementTool().getResourceIdealState(clusterName, db); if (idealState == null) { @@ -362,7 +376,7 @@ protected IdealState createResourceWithDelayedRebalance(String clusterName, Stri if (delay > 0) { idealState.setRebalanceDelay(delay); } - idealState.setRebalancerClassName(DelayedAutoRebalancer.class.getName()); + idealState.setRebalancerClassName(rebalancerClassName); _gSetupTool.getClusterManagementTool().setResourceIdealState(clusterName, db, idealState); _gSetupTool.rebalanceStorageCluster(clusterName, db, replica); idealState = _gSetupTool.getClusterManagementTool().getResourceIdealState(clusterName, db); diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/MockAssignmentMetadataStore.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/MockAssignmentMetadataStore.java index 3371c8b4db..7d054160b3 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/MockAssignmentMetadataStore.java +++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/MockAssignmentMetadataStore.java @@ -23,6 +23,7 @@ import java.util.Map; import org.apache.helix.BucketDataAccessor; import org.apache.helix.model.ResourceAssignment; +import org.mockito.Mockito; /** * A mock up metadata store for unit test. @@ -32,8 +33,8 @@ public class MockAssignmentMetadataStore extends AssignmentMetadataStore { private Map _persistGlobalBaseline = new HashMap<>(); private Map _persistBestPossibleAssignment = new HashMap<>(); - MockAssignmentMetadataStore(BucketDataAccessor bucketDataAccessor, String clusterName) { - super(bucketDataAccessor, clusterName); + MockAssignmentMetadataStore() { + super(Mockito.mock(BucketDataAccessor.class), ""); } public Map getBaseline() { @@ -53,6 +54,10 @@ public void persistBestPossibleAssignment( _persistBestPossibleAssignment = bestPossibleAssignment; } + public void close() { + // do nothing + } + public void clearMetadataStore() { _persistBestPossibleAssignment.clear(); _persistGlobalBaseline.clear(); diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestAssignmentMetadataStore.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestAssignmentMetadataStore.java index 922915fdf4..ecd2af3769 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestAssignmentMetadataStore.java +++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestAssignmentMetadataStore.java @@ -20,6 +20,7 @@ */ import java.util.Map; + import org.apache.helix.HelixManager; import org.apache.helix.HelixManagerFactory; import org.apache.helix.InstanceType; @@ -28,6 +29,7 @@ import org.apache.helix.integration.manager.MockParticipantManager; import org.apache.helix.model.ResourceAssignment; import org.testng.Assert; +import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; @@ -79,7 +81,15 @@ public void beforeClass() throws Exception { _manager.connect(); // create AssignmentMetadataStore - _store = new AssignmentMetadataStore(_manager); + _store = new AssignmentMetadataStore(_manager.getMetadataStoreConnectionString(), + _manager.getClusterName()); + } + + @AfterClass + public void afterClass() { + if (_store != null) { + _store.close(); + } } /** @@ -91,11 +101,7 @@ public void beforeClass() throws Exception { */ @Test public void testReadEmptyBaseline() { - try { - Map baseline = _store.getBaseline(); - Assert.fail("Should fail because there shouldn't be any data."); - } catch (Exception e) { - // OK - } + Map baseline = _store.getBaseline(); + Assert.assertTrue(baseline.isEmpty()); } } diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java index d6fd99bf54..e7368be03e 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java +++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java @@ -26,7 +26,7 @@ import java.util.Map; import java.util.Set; import java.util.stream.Collectors; -import org.apache.helix.BucketDataAccessor; + import org.apache.helix.HelixConstants; import org.apache.helix.HelixRebalanceException; import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider; @@ -47,8 +47,9 @@ import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; -import static org.mockito.Matchers.*; -import static org.mockito.Mockito.*; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.when; public class TestWagedRebalancer extends AbstractTestClusterModel { private Set _instances; @@ -63,9 +64,7 @@ public void initialize() { _algorithm = new MockRebalanceAlgorithm(); // Initialize a mock assignment metadata store - BucketDataAccessor mockAccessor = Mockito.mock(BucketDataAccessor.class); - String clusterName = ""; // an empty string for testing purposes - _metadataStore = new MockAssignmentMetadataStore(mockAccessor, clusterName); + _metadataStore = new MockAssignmentMetadataStore(); } @Override @@ -181,9 +180,9 @@ public void testRebalanceWithCurrentState() throws IOException, HelixRebalanceEx String resourceName = csEntry.getKey(); CurrentState cs = csEntry.getValue(); for (Map.Entry partitionStateEntry : cs.getPartitionStateMap().entrySet()) { - currentStateOutput.setCurrentState(resourceName, - new Partition(partitionStateEntry.getKey()), instanceName, - partitionStateEntry.getValue()); + currentStateOutput + .setCurrentState(resourceName, new Partition(partitionStateEntry.getKey()), + instanceName, partitionStateEntry.getValue()); } } } @@ -216,7 +215,7 @@ public void testRebalanceWithCurrentState() throws IOException, HelixRebalanceEx "DROPPED"); } - @Test(dependsOnMethods = "testRebalance") + @Test(dependsOnMethods = "testRebalance", expectedExceptions = HelixRebalanceException.class, expectedExceptionsMessageRegExp = "Input contains invalid resource\\(s\\) that cannot be rebalanced by the WAGED rebalancer. \\[Resource1\\] Failure Type: INVALID_INPUT") public void testNonCompatibleConfiguration() throws IOException, HelixRebalanceException { _metadataStore.clearMetadataStore(); WagedRebalancer rebalancer = new WagedRebalancer(_metadataStore, _algorithm); @@ -233,12 +232,7 @@ public void testNonCompatibleConfiguration() throws IOException, HelixRebalanceE .forEach(partition -> resource.addPartition(partition)); return resource; })); - Map newIdealStates = - rebalancer.computeNewIdealStates(clusterData, resourceMap, new CurrentStateOutput()); - Map algorithmResult = _algorithm.getRebalanceResult(); - // The output shall not contains the nonCompatibleResource. - resourceMap.remove(nonCompatibleResourceName); - validateRebalanceResult(resourceMap, newIdealStates, algorithmResult); + rebalancer.computeNewIdealStates(clusterData, resourceMap, new CurrentStateOutput()); } // TODO test with invalid capacity configuration which will fail the cluster model constructing. @@ -283,7 +277,7 @@ public void testInvalidRebalancerStatus() throws IOException { Assert.assertEquals(ex.getFailureType(), HelixRebalanceException.Type.INVALID_REBALANCER_STATUS); Assert.assertEquals(ex.getMessage(), - "Failed to get the persisted assignment records. Failure Type: INVALID_REBALANCER_STATUS"); + "Failed to get the current baseline assignment because of unexpected error. Failure Type: INVALID_REBALANCER_STATUS"); } } @@ -425,8 +419,9 @@ private void validateRebalanceResult(Map resourceMap, Assert.assertTrue(newIdealStates.containsKey(resourceName)); IdealState is = newIdealStates.get(resourceName); ResourceAssignment assignment = expectedResult.get(resourceName); - Assert.assertEquals(is.getPartitionSet(), new HashSet<>(assignment.getMappedPartitions() - .stream().map(partition -> partition.getPartitionName()).collect(Collectors.toSet()))); + Assert.assertEquals(is.getPartitionSet(), new HashSet<>( + assignment.getMappedPartitions().stream().map(partition -> partition.getPartitionName()) + .collect(Collectors.toSet()))); for (String partitionName : is.getPartitionSet()) { Assert.assertEquals(is.getInstanceStateMap(partitionName), assignment.getReplicaMap(new Partition(partitionName))); diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/MockRebalanceAlgorithm.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/MockRebalanceAlgorithm.java index 2a39482076..759c6857f0 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/MockRebalanceAlgorithm.java +++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/MockRebalanceAlgorithm.java @@ -72,7 +72,7 @@ public OptimalAssignment calculate(ClusterModel clusterModel) { _resultHistory = result; - // TODO remove this mockito when OptimalAssignment.getOptimalResourceAssignment is ready. + // Mock the return value for supporting test. OptimalAssignment optimalAssignment = Mockito.mock(OptimalAssignment.class); when(optimalAssignment.getOptimalResourceAssignment()).thenReturn(result); return optimalAssignment; diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java index 0f799b36a2..91db0762b6 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java +++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java @@ -103,8 +103,8 @@ protected ResourceControllerDataProvider setupClusterDataCache() throws IOExcept ClusterConfig testClusterConfig = new ClusterConfig("testClusterConfigId"); testClusterConfig.setMaxPartitionsPerInstance(5); testClusterConfig.setDisabledInstances(Collections.emptyMap()); - testClusterConfig.setTopologyAwareEnabled(false); testClusterConfig.setInstanceCapacityKeys(new ArrayList<>(_capacityDataMap.keySet())); + testClusterConfig.setTopologyAwareEnabled(true); when(testCache.getClusterConfig()).thenReturn(testClusterConfig); // 3. Mock the live instance node for the default instance. diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestOptimalAssignment.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestOptimalAssignment.java new file mode 100644 index 0000000000..bd820a977c --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestOptimalAssignment.java @@ -0,0 +1,91 @@ +package org.apache.helix.controller.rebalancer.waged.model; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.Map; + +import org.apache.helix.HelixException; +import org.apache.helix.model.Partition; +import org.apache.helix.model.ResourceAssignment; +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +public class TestOptimalAssignment extends ClusterModelTestHelper { + + @BeforeClass + public void initialize() { + super.initialize(); + } + + @Test + public void testUpdateAssignment() throws IOException { + OptimalAssignment assignment = new OptimalAssignment(); + + // update with empty cluster model + assignment.updateAssignments(getDefaultClusterModel()); + Map optimalAssignmentMap = + assignment.getOptimalResourceAssignment(); + Assert.assertEquals(optimalAssignmentMap, Collections.emptyMap()); + + // update with valid assignment + ClusterModel model = getDefaultClusterModel(); + model.assign(_resourceNames.get(0), _partitionNames.get(1), "SLAVE", _testInstanceId); + model.assign(_resourceNames.get(0), _partitionNames.get(0), "MASTER", _testInstanceId); + assignment.updateAssignments(model); + optimalAssignmentMap = assignment.getOptimalResourceAssignment(); + Assert.assertEquals(optimalAssignmentMap.get(_resourceNames.get(0)).getMappedPartitions(), + Arrays + .asList(new Partition(_partitionNames.get(0)), new Partition(_partitionNames.get(1)))); + Assert.assertEquals(optimalAssignmentMap.get(_resourceNames.get(0)) + .getReplicaMap(new Partition(_partitionNames.get(1))), + Collections.singletonMap(_testInstanceId, "SLAVE")); + Assert.assertEquals(optimalAssignmentMap.get(_resourceNames.get(0)) + .getReplicaMap(new Partition(_partitionNames.get(0))), + Collections.singletonMap(_testInstanceId, "MASTER")); + } + + @Test(dependsOnMethods = "testUpdateAssignment") + public void TestAssignmentFailure() throws IOException { + OptimalAssignment assignment = new OptimalAssignment(); + ClusterModel model = getDefaultClusterModel(); + + // record failure + AssignableReplica targetFailureReplica = + model.getAssignableReplicaMap().get(_resourceNames.get(0)).iterator().next(); + AssignableNode targetFailureNode = model.getAssignableNodes().get(_testInstanceId); + assignment.recordAssignmentFailure(targetFailureReplica, Collections + .singletonMap(targetFailureNode, Collections.singletonList("Assignment Failure!"))); + + Assert.assertTrue(assignment.hasAnyFailure()); + + assignment.updateAssignments(getDefaultClusterModel()); + try { + assignment.getOptimalResourceAssignment(); + Assert.fail("Get optimal assignment shall fail because of the failure record."); + } catch (HelixException ex) { + Assert.assertTrue(ex.getMessage().startsWith( + "Cannot get the optimal resource assignment since a calculation failure is recorded.")); + } + } +} diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java new file mode 100644 index 0000000000..fb5375c2c7 --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java @@ -0,0 +1,477 @@ +package org.apache.helix.integration.rebalancer.WagedRebalancer; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.helix.ConfigAccessor; +import org.apache.helix.TestHelper; +import org.apache.helix.common.ZkTestBase; +import org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy; +import org.apache.helix.controller.rebalancer.strategy.CrushRebalanceStrategy; +import org.apache.helix.integration.manager.ClusterControllerManager; +import org.apache.helix.integration.manager.MockParticipantManager; +import org.apache.helix.model.BuiltInStateModelDefinitions; +import org.apache.helix.model.ClusterConfig; +import org.apache.helix.model.ExternalView; +import org.apache.helix.model.IdealState; +import org.apache.helix.model.InstanceConfig; +import org.apache.helix.tools.ClusterVerifiers.HelixClusterVerifier; +import org.apache.helix.tools.ClusterVerifiers.StrictMatchExternalViewVerifier; +import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +public class TestWagedRebalance extends ZkTestBase { + protected final int NUM_NODE = 6; + protected static final int START_PORT = 12918; + protected static final int PARTITIONS = 20; + protected static final int TAGS = 2; + + protected final String CLASS_NAME = getShortClassName(); + protected final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME; + protected ClusterControllerManager _controller; + + List _participants = new ArrayList<>(); + Map _nodeToTagMap = new HashMap<>(); + List _nodes = new ArrayList<>(); + private Set _allDBs = new HashSet<>(); + private int _replica = 3; + + private static String[] _testModels = { + BuiltInStateModelDefinitions.OnlineOffline.name(), + BuiltInStateModelDefinitions.MasterSlave.name(), + BuiltInStateModelDefinitions.LeaderStandby.name() + }; + + @BeforeClass + public void beforeClass() throws Exception { + System.out.println("START " + CLASS_NAME + " at " + new Date(System.currentTimeMillis())); + + _gSetupTool.addCluster(CLUSTER_NAME, true); + + for (int i = 0; i < NUM_NODE; i++) { + String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i); + addInstanceConfig(storageNodeName, i, TAGS); + } + + // start dummy participants + for (String node : _nodes) { + MockParticipantManager participant = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, node); + participant.syncStart(); + _participants.add(participant); + } + + // start controller + String controllerName = CONTROLLER_PREFIX + "_0"; + _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName); + _controller.syncStart(); + + enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true); + } + + protected void addInstanceConfig(String storageNodeName, int seqNo, int tagCount) { + _gSetupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName); + String tag = "tag-" + seqNo % tagCount; + _gSetupTool.getClusterManagementTool().addInstanceTag(CLUSTER_NAME, storageNodeName, tag); + _nodeToTagMap.put(storageNodeName, tag); + _nodes.add(storageNodeName); + } + + @Test + public void test() throws Exception { + int i = 0; + for (String stateModel : _testModels) { + String db = "Test-DB-" + i++; + createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica, _replica, + -1); + _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica); + _allDBs.add(db); + } + Thread.sleep(300); + + validate(_replica); + + // Adding 3 more resources + i = 0; + for (String stateModel : _testModels) { + String moreDB = "More-Test-DB-" + i++; + createResourceWithWagedRebalance(CLUSTER_NAME, moreDB, stateModel, PARTITIONS, _replica, + _replica, -1); + _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, moreDB, _replica); + _allDBs.add(moreDB); + + Thread.sleep(300); + + validate(_replica); + } + + // Drop the 3 additional resources + for (int j = 0; j < 3; j++) { + String moreDB = "More-Test-DB-" + j++; + _gSetupTool.dropResourceFromCluster(CLUSTER_NAME, moreDB); + _allDBs.remove(moreDB); + + Thread.sleep(300); + + validate(_replica); + } + } + + @Test(dependsOnMethods = "test") + public void testWithInstanceTag() throws Exception { + Set tags = new HashSet(_nodeToTagMap.values()); + int i = 3; + for (String tag : tags) { + String db = "Test-DB-" + i++; + createResourceWithWagedRebalance(CLUSTER_NAME, db, + BuiltInStateModelDefinitions.MasterSlave.name(), PARTITIONS, _replica, _replica, -1); + IdealState is = + _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); + is.setInstanceGroupTag(tag); + _gSetupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, db, is); + _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica); + _allDBs.add(db); + } + Thread.sleep(300); + validate(_replica); + } + + @Test(dependsOnMethods = "test") + public void testChangeIdealState() throws InterruptedException { + String dbName = "Test-DB"; + createResourceWithWagedRebalance(CLUSTER_NAME, dbName, + BuiltInStateModelDefinitions.MasterSlave.name(), PARTITIONS, _replica, _replica, -1); + _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, dbName, _replica); + _allDBs.add(dbName); + Thread.sleep(300); + + validate(_replica); + + // Adjust the replica count + IdealState is = + _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, dbName); + int newReplicaFactor = _replica - 1; + is.setReplicas("" + newReplicaFactor); + _gSetupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, dbName, is); + Thread.sleep(300); + + validate(newReplicaFactor); + + // Adjust the partition list + is = _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, dbName); + is.setNumPartitions(PARTITIONS + 1); + _gSetupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, dbName, is); + _gSetupTool.getClusterManagementTool().rebalance(CLUSTER_NAME, dbName, newReplicaFactor); + Thread.sleep(300); + + validate(newReplicaFactor); + ExternalView ev = + _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, dbName); + Assert.assertEquals(ev.getPartitionSet().size(), PARTITIONS + 1); + } + + @Test(dependsOnMethods = "test") + public void testDisableInstance() throws InterruptedException { + String dbName = "Test-DB"; + createResourceWithWagedRebalance(CLUSTER_NAME, dbName, + BuiltInStateModelDefinitions.MasterSlave.name(), PARTITIONS, _replica, _replica, -1); + _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, dbName, _replica); + _allDBs.add(dbName); + Thread.sleep(300); + + validate(_replica); + + // Disable participants, keep only three left + Set disableParticipants = new HashSet<>(); + + try { + for (int i = 3; i < _participants.size(); i++) { + MockParticipantManager p = _participants.get(i); + disableParticipants.add(p.getInstanceName()); + InstanceConfig config = _gSetupTool.getClusterManagementTool() + .getInstanceConfig(CLUSTER_NAME, p.getInstanceName()); + config.setInstanceEnabled(false); + _gSetupTool.getClusterManagementTool() + .setInstanceConfig(CLUSTER_NAME, p.getInstanceName(), config); + } + Thread.sleep(300); + + validate(_replica); + + // Verify there is no assignment on the disabled participants. + ExternalView ev = + _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, dbName); + for (String partition : ev.getPartitionSet()) { + Map replicaStateMap = ev.getStateMap(partition); + for (String instance : replicaStateMap.keySet()) { + Assert.assertFalse(disableParticipants.contains(instance)); + } + } + } finally { + // recover the config + for (String instanceName : disableParticipants) { + InstanceConfig config = + _gSetupTool.getClusterManagementTool().getInstanceConfig(CLUSTER_NAME, instanceName); + config.setInstanceEnabled(true); + _gSetupTool.getClusterManagementTool() + .setInstanceConfig(CLUSTER_NAME, instanceName, config); + } + } + } + + @Test(dependsOnMethods = "testDisableInstance") + public void testLackEnoughLiveInstances() throws Exception { + // shutdown participants, keep only two left + for (int i = 2; i < _participants.size(); i++) { + _participants.get(i).syncStop(); + } + + int j = 0; + for (String stateModel : _testModels) { + String db = "Test-DB-" + j++; + createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica, _replica, + -1); + _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica); + _allDBs.add(db); + } + + Thread.sleep(300); + // Verify if the partitions get assigned + validate(2); + + // restart the participants within the zone + for (int i = 2; i < _participants.size(); i++) { + MockParticipantManager p = _participants.get(i); + MockParticipantManager newNode = + new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, p.getInstanceName()); + _participants.set(i, newNode); + newNode.syncStart(); + } + + Thread.sleep(300); + // Verify if the partitions get assigned + validate(_replica); + } + + @Test(dependsOnMethods = "testDisableInstance") + public void testLackEnoughInstances() throws Exception { + // shutdown participants, keep only two left + for (int i = 2; i < _participants.size(); i++) { + MockParticipantManager p = _participants.get(i); + p.syncStop(); + _gSetupTool.getClusterManagementTool() + .enableInstance(CLUSTER_NAME, p.getInstanceName(), false); + _gSetupTool.dropInstanceFromCluster(CLUSTER_NAME, p.getInstanceName()); + + } + + int j = 0; + for (String stateModel : _testModels) { + String db = "Test-DB-" + j++; + createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica, _replica, + -1); + _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica); + _allDBs.add(db); + } + + Thread.sleep(300); + // Verify if the partitions get assigned + validate(2); + + // Create new participants within the zone + for (int i = 2; i < _participants.size(); i++) { + MockParticipantManager p = _participants.get(i); + String replaceNodeName = p.getInstanceName() + "-replacement_" + START_PORT; + addInstanceConfig(replaceNodeName, i, TAGS); + MockParticipantManager newNode = + new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, replaceNodeName); + _participants.set(i, newNode); + newNode.syncStart(); + } + + Thread.sleep(300); + // Verify if the partitions get assigned + validate(_replica); + } + + @Test(dependsOnMethods = "test") + public void testMixedRebalancerUsage() throws InterruptedException { + int i = 0; + for (String stateModel : _testModels) { + String db = "Test-DB-" + i++; + if (i == 0) { + _gSetupTool.addResourceToCluster(CLUSTER_NAME, db, PARTITIONS, stateModel, + IdealState.RebalanceMode.FULL_AUTO + "", CrushRebalanceStrategy.class.getName()); + } else if (i == 1) { + _gSetupTool.addResourceToCluster(CLUSTER_NAME, db, PARTITIONS, stateModel, + IdealState.RebalanceMode.FULL_AUTO + "", CrushEdRebalanceStrategy.class.getName()); + } else { + createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica, + _replica, -1); + } + _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica); + _allDBs.add(db); + } + Thread.sleep(300); + + validate(_replica); + } + + @Test(dependsOnMethods = "test") + public void testMaxPartitionLimitation() throws Exception { + ConfigAccessor configAccessor = new ConfigAccessor(_gZkClient); + ClusterConfig clusterConfig = configAccessor.getClusterConfig(CLUSTER_NAME); + // Change the cluster level config so no assignment can be done + clusterConfig.setMaxPartitionsPerInstance(1); + configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig); + try { + String limitedResourceName = null; + int i = 0; + for (String stateModel : _testModels) { + String db = "Test-DB-" + i++; + createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica, + _replica, -1); + if (i == 1) { + // The limited resource has additional limitation, so even the other resources can be assigned + // later, this resource will still be blocked by the max partition limitation. + limitedResourceName = db; + IdealState idealState = + _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); + idealState.setMaxPartitionsPerInstance(1); + _gSetupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, db, idealState); + } + _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica); + _allDBs.add(db); + } + Thread.sleep(300); + + // Since the WAGED rebalancer does not do partial rebalance, the initial assignment won't show. + Assert.assertFalse(TestHelper.verify(() -> _allDBs.stream().allMatch(db -> { + ExternalView ev = + _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); + return ev != null && !ev.getPartitionSet().isEmpty(); + }), 2000)); + + // Remove the cluster level limitation + clusterConfig.setMaxPartitionsPerInstance(-1); + configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig); + Thread.sleep(300); + + // wait until any of the resources is rebalanced + TestHelper.verify(() -> { + for (String db : _allDBs) { + ExternalView ev = + _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); + if (ev != null && !ev.getPartitionSet().isEmpty()) { + return true; + } + } + return false; + }, 3000); + ExternalView ev = _gSetupTool.getClusterManagementTool() + .getResourceExternalView(CLUSTER_NAME, limitedResourceName); + Assert.assertFalse(ev != null && !ev.getPartitionSet().isEmpty()); + + // Remove the resource level limitation + IdealState idealState = _gSetupTool.getClusterManagementTool() + .getResourceIdealState(CLUSTER_NAME, limitedResourceName); + idealState.setMaxPartitionsPerInstance(Integer.MAX_VALUE); + _gSetupTool.getClusterManagementTool() + .setResourceIdealState(CLUSTER_NAME, limitedResourceName, idealState); + + validate(_replica); + } finally { + // recover the config change + clusterConfig = configAccessor.getClusterConfig(CLUSTER_NAME); + clusterConfig.setMaxPartitionsPerInstance(-1); + configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig); + } + } + + private void validate(int expectedReplica) { + HelixClusterVerifier _clusterVerifier = + new StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR) + .setResources(_allDBs).build(); + Assert.assertTrue(_clusterVerifier.verify(5000)); + for (String db : _allDBs) { + IdealState is = + _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); + ExternalView ev = + _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); + validateIsolation(is, ev, expectedReplica); + } + } + + /** + * Validate each partition is different instances and with necessary tagged instances. + */ + private void validateIsolation(IdealState is, ExternalView ev, int expectedReplica) { + String tag = is.getInstanceGroupTag(); + for (String partition : is.getPartitionSet()) { + Map assignmentMap = ev.getRecord().getMapField(partition); + Set instancesInEV = assignmentMap.keySet(); + Assert.assertEquals(instancesInEV.size(), expectedReplica); + for (String instance : instancesInEV) { + if (tag != null) { + InstanceConfig config = + _gSetupTool.getClusterManagementTool().getInstanceConfig(CLUSTER_NAME, instance); + Assert.assertTrue(config.containsTag(tag)); + } + } + } + } + + @AfterMethod + public void afterMethod() throws Exception { + for (String db : _allDBs) { + _gSetupTool.dropResourceFromCluster(CLUSTER_NAME, db); + } + _allDBs.clear(); + // waiting for all DB be dropped. + Thread.sleep(100); + ZkHelixClusterVerifier _clusterVerifier = + new StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR) + .setResources(_allDBs).build(); + Assert.assertTrue(_clusterVerifier.verifyByPolling()); + } + + @AfterClass + public void afterClass() throws Exception { + if (_controller != null && _controller.isConnected()) { + _controller.syncStop(); + } + for (MockParticipantManager p : _participants) { + if (p != null && p.isConnected()) { + p.syncStop(); + } + } + deleteCluster(CLUSTER_NAME); + } +} diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalanceFaultZone.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalanceFaultZone.java new file mode 100644 index 0000000000..0b020dbebc --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalanceFaultZone.java @@ -0,0 +1,374 @@ +package org.apache.helix.integration.rebalancer.WagedRebalancer; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.helix.ConfigAccessor; +import org.apache.helix.common.ZkTestBase; +import org.apache.helix.integration.manager.ClusterControllerManager; +import org.apache.helix.integration.manager.MockParticipantManager; +import org.apache.helix.model.BuiltInStateModelDefinitions; +import org.apache.helix.model.ClusterConfig; +import org.apache.helix.model.ExternalView; +import org.apache.helix.model.IdealState; +import org.apache.helix.model.InstanceConfig; +import org.apache.helix.tools.ClusterVerifiers.StrictMatchExternalViewVerifier; +import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +public class TestWagedRebalanceFaultZone extends ZkTestBase { + protected final int NUM_NODE = 6; + protected static final int START_PORT = 12918; + protected static final int PARTITIONS = 20; + protected static final int ZONES = 3; + protected static final int TAGS = 2; + + protected final String CLASS_NAME = getShortClassName(); + protected final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME; + protected ClusterControllerManager _controller; + + List _participants = new ArrayList<>(); + Map _nodeToZoneMap = new HashMap<>(); + Map _nodeToTagMap = new HashMap<>(); + List _nodes = new ArrayList<>(); + Set _allDBs = new HashSet<>(); + int _replica = 3; + + String[] _testModels = { + BuiltInStateModelDefinitions.OnlineOffline.name(), + BuiltInStateModelDefinitions.MasterSlave.name(), + BuiltInStateModelDefinitions.LeaderStandby.name() + }; + + @BeforeClass + public void beforeClass() throws Exception { + System.out.println("START " + CLASS_NAME + " at " + new Date(System.currentTimeMillis())); + + _gSetupTool.addCluster(CLUSTER_NAME, true); + + for (int i = 0; i < NUM_NODE; i++) { + String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i); + addInstanceConfig(storageNodeName, i, ZONES, TAGS); + } + + // start dummy participants + for (String node : _nodes) { + MockParticipantManager participant = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, node); + participant.syncStart(); + _participants.add(participant); + } + + // start controller + String controllerName = CONTROLLER_PREFIX + "_0"; + _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName); + _controller.syncStart(); + + enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true); + enableTopologyAwareRebalance(_gZkClient, CLUSTER_NAME, true); + } + + protected void addInstanceConfig(String storageNodeName, int seqNo, int zoneCount, int tagCount) { + _gSetupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName); + String zone = "zone-" + seqNo % zoneCount; + String tag = "tag-" + seqNo % tagCount; + _gSetupTool.getClusterManagementTool().setInstanceZoneId(CLUSTER_NAME, storageNodeName, zone); + _gSetupTool.getClusterManagementTool().addInstanceTag(CLUSTER_NAME, storageNodeName, tag); + _nodeToZoneMap.put(storageNodeName, zone); + _nodeToTagMap.put(storageNodeName, tag); + _nodes.add(storageNodeName); + } + + @Test + public void testZoneIsolation() throws Exception { + int i = 0; + for (String stateModel : _testModels) { + String db = "Test-DB-" + i++; + createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica, + _replica, -1); + _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica); + _allDBs.add(db); + } + Thread.sleep(300); + + validate(_replica); + } + + @Test + public void testZoneIsolationWithInstanceTag() throws Exception { + Set tags = new HashSet(_nodeToTagMap.values()); + int i = 0; + for (String tag : tags) { + String db = "Test-DB-" + i++; + createResourceWithWagedRebalance(CLUSTER_NAME, db, + BuiltInStateModelDefinitions.MasterSlave.name(), PARTITIONS, _replica, _replica, -1); + IdealState is = + _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); + is.setInstanceGroupTag(tag); + _gSetupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, db, is); + _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica); + _allDBs.add(db); + } + Thread.sleep(300); + + validate(_replica); + } + + @Test(dependsOnMethods = { "testZoneIsolation", "testZoneIsolationWithInstanceTag" }) + public void testLackEnoughLiveRacks() throws Exception { + // shutdown participants within one zone + String zone = _nodeToZoneMap.values().iterator().next(); + for (int i = 0; i < _participants.size(); i++) { + MockParticipantManager p = _participants.get(i); + if (_nodeToZoneMap.get(p.getInstanceName()).equals(zone)) { + p.syncStop(); + } + } + + int j = 0; + for (String stateModel : _testModels) { + String db = "Test-DB-" + j++; + createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica, + _replica, -1); + _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica); + _allDBs.add(db); + } + Thread.sleep(300); + validate(2); + + // restart the participants within the zone + for (int i = 0; i < _participants.size(); i++) { + MockParticipantManager p = _participants.get(i); + if (_nodeToZoneMap.get(p.getInstanceName()).equals(zone)) { + MockParticipantManager newNode = + new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, p.getInstanceName()); + _participants.set(i, newNode); + newNode.syncStart(); + } + } + + Thread.sleep(300); + // Verify if the partitions get assigned + validate(_replica); + } + + @Test(dependsOnMethods = { "testLackEnoughLiveRacks" }) + public void testLackEnoughRacks() throws Exception { + // shutdown participants within one zone + String zone = _nodeToZoneMap.values().iterator().next(); + for (int i = 0; i < _participants.size(); i++) { + MockParticipantManager p = _participants.get(i); + if (_nodeToZoneMap.get(p.getInstanceName()).equals(zone)) { + p.syncStop(); + _gSetupTool.getClusterManagementTool() + .enableInstance(CLUSTER_NAME, p.getInstanceName(), false); + Thread.sleep(50); + _gSetupTool.dropInstanceFromCluster(CLUSTER_NAME, p.getInstanceName()); + } + } + + int j = 0; + for (String stateModel : _testModels) { + String db = "Test-DB-" + j++; + createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica, + _replica, -1); + _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica); + _allDBs.add(db); + } + Thread.sleep(300); + validate(2); + + // Create new participants within the zone + int nodeCount = _participants.size(); + for (int i = 0; i < nodeCount; i++) { + MockParticipantManager p = _participants.get(i); + if (_nodeToZoneMap.get(p.getInstanceName()).equals(zone)) { + String replaceNodeName = p.getInstanceName() + "-replacement_" + START_PORT; + addInstanceConfig(replaceNodeName, i, ZONES, TAGS); + MockParticipantManager newNode = + new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, replaceNodeName); + _participants.set(i, newNode); + newNode.syncStart(); + } + } + + Thread.sleep(300); + // Verify if the partitions get assigned + validate(_replica); + } + + @Test(dependsOnMethods = { "testZoneIsolation", "testZoneIsolationWithInstanceTag" }) + public void testAddZone() throws Exception { + int i = 0; + for (String stateModel : _testModels) { + String db = "Test-DB-" + i++; + createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica, + _replica, -1); + _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica); + _allDBs.add(db); + } + Thread.sleep(300); + + validate(_replica); + + // Create new participants within the a new zone + Set newNodes = new HashSet<>(); + Map newNodeReplicaCount = new HashMap<>(); + + ConfigAccessor configAccessor = new ConfigAccessor(_gZkClient); + + try { + // Configure the preference so as to allow movements. + ClusterConfig clusterConfig = configAccessor.getClusterConfig(CLUSTER_NAME); + Map preference = new HashMap<>(); + preference.put(ClusterConfig.GlobalRebalancePreferenceKey.EVENNESS, 10); + preference.put(ClusterConfig.GlobalRebalancePreferenceKey.LESS_MOVEMENT, 0); + clusterConfig.setGlobalRebalancePreference(preference); + configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig); + + int nodeCount = 2; + for (int j = 0; j < nodeCount; j++) { + String newNodeName = "new-zone-node-" + j + "_" + START_PORT; + // Add all new node to the new zone + addInstanceConfig(newNodeName, j, ZONES + 1, TAGS); + MockParticipantManager newNode = + new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, newNodeName); + newNode.syncStart(); + newNodes.add(newNode); + newNodeReplicaCount.put(newNodeName, 0); + } + Thread.sleep(300); + + validate(_replica); + + // The new zone nodes shall have some assignments + for (String db : _allDBs) { + IdealState is = + _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); + ExternalView ev = + _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); + validateZoneAndTagIsolation(is, ev, _replica); + for (String partition : ev.getPartitionSet()) { + Map stateMap = ev.getStateMap(partition); + for (String node : stateMap.keySet()) { + if (newNodeReplicaCount.containsKey(node)) { + newNodeReplicaCount.computeIfPresent(node, (nodeName, replicaCount) -> replicaCount + 1); + } + } + } + } + Assert.assertTrue(newNodeReplicaCount.values().stream().allMatch(count -> count > 0)); + } finally { + // Revert the preference + ClusterConfig clusterConfig = configAccessor.getClusterConfig(CLUSTER_NAME); + Map preference = new HashMap<>(); + preference.put(ClusterConfig.GlobalRebalancePreferenceKey.EVENNESS, 1); + preference.put(ClusterConfig.GlobalRebalancePreferenceKey.LESS_MOVEMENT, 1); + clusterConfig.setGlobalRebalancePreference(preference); + configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig); + // Stop the new nodes + for (MockParticipantManager p : newNodes) { + if (p != null && p.isConnected()) { + p.syncStop(); + } + } + } + } + + private void validate(int expectedReplica) { + ZkHelixClusterVerifier _clusterVerifier = + new StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR) + .setResources(_allDBs).build(); + Assert.assertTrue(_clusterVerifier.verifyByPolling()); + + for (String db : _allDBs) { + IdealState is = + _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); + ExternalView ev = + _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); + validateZoneAndTagIsolation(is, ev, expectedReplica); + } + } + + /** + * Validate instances for each partition is on different zone and with necessary tagged instances. + */ + private void validateZoneAndTagIsolation(IdealState is, ExternalView ev, int expectedReplica) { + String tag = is.getInstanceGroupTag(); + for (String partition : is.getPartitionSet()) { + Set assignedZones = new HashSet(); + + Map assignmentMap = ev.getRecord().getMapField(partition); + Set instancesInEV = assignmentMap.keySet(); + // TODO: preference List is not persisted in IS. + // Assert.assertEquals(instancesInEV, instancesInIs); + for (String instance : instancesInEV) { + assignedZones.add(_nodeToZoneMap.get(instance)); + if (tag != null) { + InstanceConfig config = + _gSetupTool.getClusterManagementTool().getInstanceConfig(CLUSTER_NAME, instance); + Assert.assertTrue(config.containsTag(tag)); + } + } + Assert.assertEquals(assignedZones.size(), expectedReplica); + } + } + + @AfterMethod + public void afterMethod() throws Exception { + for (String db : _allDBs) { + _gSetupTool.dropResourceFromCluster(CLUSTER_NAME, db); + } + _allDBs.clear(); + // waiting for all DB be dropped. + Thread.sleep(100); + ZkHelixClusterVerifier _clusterVerifier = + new StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR) + .setResources(_allDBs).build(); + Assert.assertTrue(_clusterVerifier.verifyByPolling()); + } + + @AfterClass + public void afterClass() throws Exception { + /* + * shutdown order: 1) disconnect the controller 2) disconnect participants + */ + if (_controller != null && _controller.isConnected()) { + _controller.syncStop(); + } + for (MockParticipantManager p : _participants) { + if (p != null && p.isConnected()) { + p.syncStop(); + } + } + deleteCluster(CLUSTER_NAME); + System.out.println("END " + CLASS_NAME + " at " + new Date(System.currentTimeMillis())); + } +} diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalanceTopologyAware.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalanceTopologyAware.java new file mode 100644 index 0000000000..412fc8c1ff --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalanceTopologyAware.java @@ -0,0 +1,114 @@ +package org.apache.helix.integration.rebalancer.WagedRebalancer; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.Date; + +import org.apache.helix.ConfigAccessor; +import org.apache.helix.integration.manager.ClusterControllerManager; +import org.apache.helix.integration.manager.MockParticipantManager; +import org.apache.helix.model.ClusterConfig; +import org.apache.helix.model.InstanceConfig; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +public class TestWagedRebalanceTopologyAware extends TestWagedRebalanceFaultZone { + private static final String TOLOPOGY_DEF = "/DOMAIN/ZONE/INSTANCE"; + private static final String DOMAIN_NAME = "Domain"; + private static final String FAULT_ZONE = "ZONE"; + + protected final String CLASS_NAME = getShortClassName(); + protected final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME; + + @BeforeClass + public void beforeClass() throws Exception { + System.out.println("START " + CLASS_NAME + " at " + new Date(System.currentTimeMillis())); + + _gSetupTool.addCluster(CLUSTER_NAME, true); + + ConfigAccessor configAccessor = new ConfigAccessor(_gZkClient); + ClusterConfig clusterConfig = configAccessor.getClusterConfig(CLUSTER_NAME); + clusterConfig.setTopology(TOLOPOGY_DEF); + clusterConfig.setFaultZoneType(FAULT_ZONE); + configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig); + + for (int i = 0; i < NUM_NODE; i++) { + String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i); + addInstanceConfig(storageNodeName, i, ZONES, TAGS); + } + + // start dummy participants + for (String node : _nodes) { + MockParticipantManager participant = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, node); + participant.syncStart(); + _participants.add(participant); + } + + // start controller + String controllerName = CONTROLLER_PREFIX + "_0"; + _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName); + _controller.syncStart(); + + enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true); + enableTopologyAwareRebalance(_gZkClient, CLUSTER_NAME, true); + } + + protected void addInstanceConfig(String storageNodeName, int seqNo, int zoneCount, int tagCount) { + _gSetupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName); + String zone = "zone-" + seqNo % zoneCount; + String tag = "tag-" + seqNo % tagCount; + + InstanceConfig config = + _gSetupTool.getClusterManagementTool().getInstanceConfig(CLUSTER_NAME, storageNodeName); + config.setDomain( + String.format("DOMAIN=%s,ZONE=%s,INSTANCE=%s", DOMAIN_NAME, zone, storageNodeName)); + config.addTag(tag); + _gSetupTool.getClusterManagementTool().setInstanceConfig(CLUSTER_NAME, storageNodeName, config); + + _nodeToZoneMap.put(storageNodeName, zone); + _nodeToTagMap.put(storageNodeName, tag); + _nodes.add(storageNodeName); + } + + @Test + public void testZoneIsolation() throws Exception { + super.testZoneIsolation(); + } + + @Test + public void testZoneIsolationWithInstanceTag() throws Exception { + super.testZoneIsolationWithInstanceTag(); + } + + @Test(dependsOnMethods = { "testZoneIsolation", "testZoneIsolationWithInstanceTag" }) + public void testLackEnoughLiveRacks() throws Exception { + super.testLackEnoughLiveRacks(); + } + + @Test(dependsOnMethods = { "testLackEnoughLiveRacks" }) + public void testLackEnoughRacks() throws Exception { + super.testLackEnoughRacks(); + } + + @Test(dependsOnMethods = { "testZoneIsolation", "testZoneIsolationWithInstanceTag" }) + public void testAddZone() throws Exception { + super.testAddZone(); + } +}