From f70be0d9dd11d81939e35047c84674cec7b28f2e Mon Sep 17 00:00:00 2001 From: Jiajun Wang Date: Wed, 11 Sep 2019 16:59:33 -0700 Subject: [PATCH 1/5] Integrate the WAGED rebalancer with all the related components. 1. Integrate with algorithm, assignment metadata store etc. Fix several conflicting interfaces and logics so as to all the rebalancer run correctly. 2. Complete OptimalAssignment. 3. Add integration test for the correctness of rebalance logic. --- .../apache/helix/HelixRebalanceException.java | 1 + .../waged/AssignmentMetadataStore.java | 54 +- .../rebalancer/waged/WagedRebalancer.java | 248 ++++++--- .../constraints/ConstraintBasedAlgorithm.java | 91 +++- .../NodeMaxPartitionLimitConstraint.java | 9 +- .../waged/model/AssignableNode.java | 38 +- .../waged/model/AssignableReplica.java | 12 +- .../waged/model/ClusterModelProvider.java | 30 ++ .../waged/model/OptimalAssignment.java | 52 +- .../stages/BestPossibleStateCalcStage.java | 153 ++++-- .../manager/zk/ZkBucketDataAccessor.java | 3 +- .../org/apache/helix/common/ZkTestBase.java | 18 +- .../waged/MockAssignmentMetadataStore.java | 9 +- .../waged/TestAssignmentMetadataStore.java | 20 +- .../rebalancer/waged/TestWagedRebalancer.java | 33 +- .../constraints/MockRebalanceAlgorithm.java | 2 +- .../waged/model/AbstractTestClusterModel.java | 2 +- .../waged/model/TestOptimalAssignment.java | 91 ++++ .../WagedRebalancer/TestWagedRebalance.java | 476 ++++++++++++++++++ .../TestWagedRebalanceFaultZone.java | 405 +++++++++++++++ .../TestWagedRebalanceTopologyAware.java | 114 +++++ 21 files changed, 1631 insertions(+), 230 deletions(-) create mode 100644 helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestOptimalAssignment.java create mode 100644 helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java create mode 100644 helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalanceFaultZone.java create mode 100644 helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalanceTopologyAware.java diff --git a/helix-core/src/main/java/org/apache/helix/HelixRebalanceException.java b/helix-core/src/main/java/org/apache/helix/HelixRebalanceException.java index a8b5055fe8..b90a7d80cb 100644 --- a/helix-core/src/main/java/org/apache/helix/HelixRebalanceException.java +++ b/helix-core/src/main/java/org/apache/helix/HelixRebalanceException.java @@ -27,6 +27,7 @@ public enum Type { INVALID_CLUSTER_STATUS, INVALID_REBALANCER_STATUS, FAILED_TO_CALCULATE, + INVALID_INPUT, UNKNOWN_FAILURE } diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentMetadataStore.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentMetadataStore.java index fd655d13ad..a540ffbca8 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentMetadataStore.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentMetadataStore.java @@ -20,13 +20,15 @@ */ import java.io.IOException; -import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.Map; + +import com.google.common.annotations.VisibleForTesting; +import org.I0Itec.zkclient.exception.ZkNoNodeException; import org.I0Itec.zkclient.serialize.ZkSerializer; import org.apache.helix.BucketDataAccessor; import org.apache.helix.HelixException; -import org.apache.helix.HelixManager; import org.apache.helix.HelixProperty; import org.apache.helix.ZNRecord; import org.apache.helix.manager.zk.ZNRecordJacksonSerializer; @@ -50,23 +52,27 @@ public class AssignmentMetadataStore { private Map _globalBaseline; private Map _bestPossibleAssignment; + AssignmentMetadataStore(String metadataStoreAddrs, String clusterName) { + this(new ZkBucketDataAccessor(metadataStoreAddrs), clusterName); + } + AssignmentMetadataStore(BucketDataAccessor bucketDataAccessor, String clusterName) { _dataAccessor = bucketDataAccessor; _baselinePath = String.format(BASELINE_TEMPLATE, clusterName, ASSIGNMENT_METADATA_KEY); _bestPossiblePath = String.format(BEST_POSSIBLE_TEMPLATE, clusterName, ASSIGNMENT_METADATA_KEY); } - AssignmentMetadataStore(HelixManager helixManager) { - this(new ZkBucketDataAccessor(helixManager.getMetadataStoreConnectionString()), - helixManager.getClusterName()); - } - public Map getBaseline() { // Return the in-memory baseline. If null, read from ZK. This is to minimize reads from ZK if (_globalBaseline == null) { - HelixProperty baseline = - _dataAccessor.compressedBucketRead(_baselinePath, HelixProperty.class); - _globalBaseline = splitAssignments(baseline); + try { + HelixProperty baseline = + _dataAccessor.compressedBucketRead(_baselinePath, HelixProperty.class); + _globalBaseline = splitAssignments(baseline); + } catch (ZkNoNodeException ex) { + // Metadata does not exist, so return an empty map + _globalBaseline = Collections.emptyMap(); + } } return _globalBaseline; } @@ -74,9 +80,14 @@ public Map getBaseline() { public Map getBestPossibleAssignment() { // Return the in-memory baseline. If null, read from ZK. This is to minimize reads from ZK if (_bestPossibleAssignment == null) { - HelixProperty baseline = - _dataAccessor.compressedBucketRead(_bestPossiblePath, HelixProperty.class); - _bestPossibleAssignment = splitAssignments(baseline); + try { + HelixProperty baseline = + _dataAccessor.compressedBucketRead(_bestPossiblePath, HelixProperty.class); + _bestPossibleAssignment = splitAssignments(baseline); + } catch (ZkNoNodeException ex) { + // Metadata does not exist, so return an empty map + _bestPossibleAssignment = Collections.emptyMap(); + } } return _bestPossibleAssignment; } @@ -113,6 +124,16 @@ public void persistBestPossibleAssignment( _bestPossibleAssignment = bestPossibleAssignment; } + protected void finalize() { + // To ensure all resources are released. + close(); + } + + // Close to release all the resources. + public void close() { + _dataAccessor.disconnect(); + } + /** * Produces one HelixProperty that contains all assignment data. * @param name @@ -123,8 +144,9 @@ private HelixProperty combineAssignments(String name, Map assignmentMap) { HelixProperty property = new HelixProperty(name); // Add each resource's assignment as a simple field in one ZNRecord + // Node that don't use Arrays.toString() for the record converting. The deserialize will fail. assignmentMap.forEach((resource, assignment) -> property.getRecord().setSimpleField(resource, - Arrays.toString(SERIALIZER.serialize(assignment.getRecord())))); + new String(SERIALIZER.serialize(assignment.getRecord())))); return property; } @@ -138,8 +160,8 @@ private Map splitAssignments(HelixProperty property) // Convert each resource's assignment String into a ResourceAssignment object and put it in a // map property.getRecord().getSimpleFields() - .forEach((resource, assignment) -> assignmentMap.put(resource, - new ResourceAssignment((ZNRecord) SERIALIZER.deserialize(assignment.getBytes())))); + .forEach((resource, assignmentStr) -> assignmentMap.put(resource, + new ResourceAssignment((ZNRecord) SERIALIZER.deserialize(assignmentStr.getBytes())))); return assignmentMap; } } diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java index 551239d6ee..1861e107a3 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java @@ -29,6 +29,7 @@ import java.util.stream.Collectors; import org.apache.helix.HelixConstants; +import org.apache.helix.HelixException; import org.apache.helix.HelixManager; import org.apache.helix.HelixRebalanceException; import org.apache.helix.controller.changedetector.ResourceChangeDetector; @@ -64,27 +65,34 @@ public class WagedRebalancer { // When any of the following change happens, the rebalancer needs to do a global rebalance which // contains 1. baseline recalculate, 2. partial rebalance that is based on the new baseline. private static final Set GLOBAL_REBALANCE_REQUIRED_CHANGE_TYPES = - ImmutableSet.of(HelixConstants.ChangeType.RESOURCE_CONFIG, - HelixConstants.ChangeType.CLUSTER_CONFIG, HelixConstants.ChangeType.INSTANCE_CONFIG); + ImmutableSet.of( + HelixConstants.ChangeType.RESOURCE_CONFIG, + HelixConstants.ChangeType.CLUSTER_CONFIG, + HelixConstants.ChangeType.INSTANCE_CONFIG); // The cluster change detector is a stateful object. // Make it static to avoid unnecessary reinitialization. private static final ThreadLocal CHANGE_DETECTOR_THREAD_LOCAL = new ThreadLocal<>(); private final MappingCalculator _mappingCalculator; - - // --------- The following fields are placeholders and need replacement. -----------// - // TODO Shall we make the metadata store a static threadlocal object as well to avoid - // reinitialization? private final AssignmentMetadataStore _assignmentMetadataStore; private final RebalanceAlgorithm _rebalanceAlgorithm; - // ------------------------------------------------------------------------------------// + + private static AssignmentMetadataStore constructAssignmentStore(HelixManager helixManager) { + AssignmentMetadataStore assignmentMetadataStore = null; + if (helixManager != null) { + String metadataStoreAddrs = helixManager.getMetadataStoreConnectionString(); + String clusterName = helixManager.getClusterName(); + if (metadataStoreAddrs != null && clusterName != null) { + assignmentMetadataStore = new AssignmentMetadataStore(metadataStoreAddrs, clusterName); + } + } + return assignmentMetadataStore; + } public WagedRebalancer(HelixManager helixManager, Map preferences) { - this( - // TODO init the metadata store according to their requirement when integrate, - // or change to final static method if possible. - new AssignmentMetadataStore(helixManager), ConstraintBasedAlgorithmFactory.getInstance(preferences), + this(constructAssignmentStore(helixManager), + ConstraintBasedAlgorithmFactory.getInstance(preferences), // Use DelayedAutoRebalancer as the mapping calculator for the final assignment output. // Mapping calculator will translate the best possible assignment into the applicable state // mapping based on the current states. @@ -94,6 +102,10 @@ public WagedRebalancer(HelixManager helixManager, private WagedRebalancer(AssignmentMetadataStore assignmentMetadataStore, RebalanceAlgorithm algorithm, MappingCalculator mappingCalculator) { + if (assignmentMetadataStore == null) { + LOG.warn("Assignment Metadata Store is not configured properly." + + " The rebalancer will not access the assignment store during the rebalance."); + } _assignmentMetadataStore = assignmentMetadataStore; _rebalanceAlgorithm = algorithm; _mappingCalculator = mappingCalculator; @@ -103,7 +115,13 @@ private WagedRebalancer(AssignmentMetadataStore assignmentMetadataStore, protected WagedRebalancer(AssignmentMetadataStore assignmentMetadataStore, RebalanceAlgorithm algorithm) { this(assignmentMetadataStore, algorithm, new DelayedAutoRebalancer()); + } + // Release all the resources. + public void close() { + if (_assignmentMetadataStore != null) { + _assignmentMetadataStore.close(); + } } /** @@ -117,27 +135,18 @@ protected WagedRebalancer(AssignmentMetadataStore assignmentMetadataStore, public Map computeNewIdealStates(ResourceControllerDataProvider clusterData, Map resourceMap, final CurrentStateOutput currentStateOutput) throws HelixRebalanceException { - LOG.info("Start computing new ideal states for resources: {}", resourceMap.keySet().toString()); - - // Find the compatible resources: 1. FULL_AUTO 2. Configured to use the WAGED rebalancer - resourceMap = resourceMap.entrySet().stream().filter(resourceEntry -> { - IdealState is = clusterData.getIdealState(resourceEntry.getKey()); - return is != null && is.getRebalanceMode().equals(IdealState.RebalanceMode.FULL_AUTO) - && getClass().getName().equals(is.getRebalancerClassName()); - }).collect(Collectors.toMap(resourceEntry -> resourceEntry.getKey(), - resourceEntry -> resourceEntry.getValue())); - if (resourceMap.isEmpty()) { - LOG.warn("There is no valid resource to be rebalanced by {}", + LOG.warn("There is no resource to be rebalanced by {}", this.getClass().getSimpleName()); return Collections.emptyMap(); - } else { - LOG.info("Valid resources that will be rebalanced by {}: {}", this.getClass().getSimpleName(), - resourceMap.keySet().toString()); } + LOG.info("Start computing new ideal states for resources: {}", resourceMap.keySet().toString()); + validateInput(clusterData, resourceMap); + // Calculate the target assignment based on the current cluster status. - Map newIdealStates = computeBestPossibleStates(clusterData, resourceMap); + Map newIdealStates = + computeBestPossibleStates(clusterData, resourceMap, currentStateOutput); // Construct the new best possible states according to the current state and target assignment. // Note that the new ideal state might be an intermediate state between the current state and @@ -166,28 +175,29 @@ public Map computeNewIdealStates(ResourceControllerDataProvi // Coordinate baseline recalculation and partial rebalance according to the cluster changes. private Map computeBestPossibleStates( - ResourceControllerDataProvider clusterData, Map resourceMap) - throws HelixRebalanceException { + ResourceControllerDataProvider clusterData, Map resourceMap, + final CurrentStateOutput currentStateOutput) throws HelixRebalanceException { getChangeDetector().updateSnapshots(clusterData); - // Get all the modified and new items' information + // Get all the changed items' information Map> clusterChanges = getChangeDetector().getChangeTypes().stream() .collect(Collectors.toMap(changeType -> changeType, changeType -> { Set itemKeys = new HashSet<>(); itemKeys.addAll(getChangeDetector().getAdditionsByType(changeType)); itemKeys.addAll(getChangeDetector().getChangesByType(changeType)); + itemKeys.addAll(getChangeDetector().getRemovalsByType(changeType)); return itemKeys; })); if (clusterChanges.keySet().stream() .anyMatch(changeType -> GLOBAL_REBALANCE_REQUIRED_CHANGE_TYPES.contains(changeType))) { - refreshBaseline(clusterData, clusterChanges, resourceMap); + refreshBaseline(clusterData, clusterChanges, resourceMap, currentStateOutput); // Inject a cluster config change for large scale partial rebalance once the baseline changed. clusterChanges.putIfAbsent(HelixConstants.ChangeType.CLUSTER_CONFIG, Collections.emptySet()); } Map newAssignment = - partialRebalance(clusterData, clusterChanges, resourceMap); + partialRebalance(clusterData, clusterChanges, resourceMap, currentStateOutput); // Convert the assignments into IdealState for the following state mapping calculation. Map finalIdealState = new HashMap<>(); @@ -213,56 +223,60 @@ private Map computeBestPossibleStates( // TODO make the Baseline calculation async if complicated algorithm is used for the Baseline private void refreshBaseline(ResourceControllerDataProvider clusterData, - Map> clusterChanges, Map resourceMap) - throws HelixRebalanceException { + Map> clusterChanges, Map resourceMap, + final CurrentStateOutput currentStateOutput) throws HelixRebalanceException { + LOG.info("Start calculating the new baseline."); + Map currentBaseline = + getBaselineAssignment(_assignmentMetadataStore, currentStateOutput, resourceMap.keySet()); // For baseline calculation // 1. Ignore node status (disable/offline). // 2. Use the baseline as the previous best possible assignment since there is no "baseline" for // the baseline. - LOG.info("Start calculating the new baseline."); - Map currentBaseline; - try { - currentBaseline = _assignmentMetadataStore.getBaseline(); - } catch (Exception ex) { - throw new HelixRebalanceException("Failed to get the current baseline assignment.", - HelixRebalanceException.Type.INVALID_REBALANCER_STATUS, ex); - } - Map baseline = calculateAssignment(clusterData, clusterChanges, - resourceMap, clusterData.getAllInstances(), Collections.emptyMap(), currentBaseline); - try { - _assignmentMetadataStore.persistBaseline(baseline); - } catch (Exception ex) { - throw new HelixRebalanceException("Failed to persist the new baseline assignment.", - HelixRebalanceException.Type.INVALID_REBALANCER_STATUS, ex); + Map newBaseline = + calculateAssignment(clusterData, clusterChanges, resourceMap, clusterData.getAllInstances(), + Collections.emptyMap(), currentBaseline); + + if (_assignmentMetadataStore != null) { + try { + _assignmentMetadataStore.persistBaseline(newBaseline); + } catch (Exception ex) { + throw new HelixRebalanceException("Failed to persist the new baseline assignment.", + HelixRebalanceException.Type.INVALID_REBALANCER_STATUS, ex); + } + } else { + LOG.debug("Assignment Metadata Store is empty. Skip persist the baseline assignment."); } + LOG.info("Finish calculating the new baseline."); } private Map partialRebalance( ResourceControllerDataProvider clusterData, - Map> clusterChanges, Map resourceMap) - throws HelixRebalanceException { + Map> clusterChanges, Map resourceMap, + final CurrentStateOutput currentStateOutput) throws HelixRebalanceException { LOG.info("Start calculating the new best possible assignment."); - Set activeInstances = clusterData.getEnabledLiveInstances(); - Map baseline; - Map prevBestPossibleAssignment; - try { - baseline = _assignmentMetadataStore.getBaseline(); - prevBestPossibleAssignment = _assignmentMetadataStore.getBestPossibleAssignment(); - } catch (Exception ex) { - throw new HelixRebalanceException("Failed to get the persisted assignment records.", - HelixRebalanceException.Type.INVALID_REBALANCER_STATUS, ex); - } - Map newAssignment = calculateAssignment(clusterData, clusterChanges, - resourceMap, activeInstances, baseline, prevBestPossibleAssignment); - try { - // TODO Test to confirm if persisting the final assignment (with final partition states) - // would be a better option. - _assignmentMetadataStore.persistBestPossibleAssignment(newAssignment); - } catch (Exception ex) { - throw new HelixRebalanceException("Failed to persist the new best possible assignment.", - HelixRebalanceException.Type.INVALID_REBALANCER_STATUS, ex); + Map currentBaseline = + getBaselineAssignment(_assignmentMetadataStore, currentStateOutput, resourceMap.keySet()); + Map currentBestPossibleAssignment = + getBestPossibleAssignment(_assignmentMetadataStore, currentStateOutput, + resourceMap.keySet()); + Map newAssignment = + calculateAssignment(clusterData, clusterChanges, resourceMap, + clusterData.getEnabledLiveInstances(), currentBaseline, currentBestPossibleAssignment); + + if (_assignmentMetadataStore != null) { + try { + // TODO Test to confirm if persisting the final assignment (with final partition states) + // would be a better option. + _assignmentMetadataStore.persistBestPossibleAssignment(newAssignment); + } catch (Exception ex) { + throw new HelixRebalanceException("Failed to persist the new best possible assignment.", + HelixRebalanceException.Type.INVALID_REBALANCER_STATUS, ex); + } + } else { + LOG.debug("Assignment Metadata Store is empty. Skip persist the baseline assignment."); } + LOG.info("Finish calculating the new best possible assignment."); return newAssignment; } @@ -348,4 +362,100 @@ private Map> getPreferenceLists(ResourceAssignment newAssig } return preferenceList; } + + private void validateInput(ResourceControllerDataProvider clusterData, + Map resourceMap) throws HelixRebalanceException { + Set nonCompatibleResources = resourceMap.entrySet().stream().filter(resourceEntry -> { + IdealState is = clusterData.getIdealState(resourceEntry.getKey()); + return is == null || !is.getRebalanceMode().equals(IdealState.RebalanceMode.FULL_AUTO) + || !getClass().getName().equals(is.getRebalancerClassName()); + }).map(Map.Entry::getKey).collect(Collectors.toSet()); + if (!nonCompatibleResources.isEmpty()) { + throw new HelixRebalanceException(String.format( + "Input contains invalid resource(s) that cannot be rebalanced by the WAGED rebalancer. %s", + nonCompatibleResources.toString()), HelixRebalanceException.Type.INVALID_INPUT); + } + } + + /** + * @param assignmentMetadataStore + * @param currentStateOutput + * @param resources + * @return The current baseline assignment. If record does not exist in the + * assignmentMetadataStore, return the current state assignment. + * @throws HelixRebalanceException + */ + private Map getBaselineAssignment( + AssignmentMetadataStore assignmentMetadataStore, CurrentStateOutput currentStateOutput, + Set resources) throws HelixRebalanceException { + Map currentBaseline = Collections.emptyMap(); + if (assignmentMetadataStore != null) { + try { + currentBaseline = assignmentMetadataStore.getBaseline(); + } catch (HelixException ex) { + // Report error. and use empty mapping instead. + LOG.error("Failed to get the current baseline assignment.", ex); + } catch (Exception ex) { + throw new HelixRebalanceException( + "Failed to get the current baseline assignment because of unexpected error.", + HelixRebalanceException.Type.INVALID_REBALANCER_STATUS, ex); + } + } + if (currentBaseline.isEmpty()) { + LOG.warn( + "The current baseline assignment record is empty. Use the current states instead."); + currentBaseline = getCurrentStateAssingment(currentStateOutput, resources); + } + return currentBaseline; + } + + /** + * @param assignmentMetadataStore + * @param currentStateOutput + * @param resources + * @return The current best possible assignment. If record does not exist in the + * assignmentMetadataStore, return the current state assignment. + * @throws HelixRebalanceException + */ + private Map getBestPossibleAssignment( + AssignmentMetadataStore assignmentMetadataStore, CurrentStateOutput currentStateOutput, + Set resources) throws HelixRebalanceException { + Map currentBestAssignment = Collections.emptyMap(); + if (assignmentMetadataStore != null) { + try { + currentBestAssignment = assignmentMetadataStore.getBestPossibleAssignment(); + } catch (HelixException ex) { + // Report error. and use empty mapping instead. + LOG.error("Failed to get the current best possible assignment.", ex); + } catch (Exception ex) { + throw new HelixRebalanceException( + "Failed to get the current best possible assignment because of unexpected error.", + HelixRebalanceException.Type.INVALID_REBALANCER_STATUS, ex); + } + } + if (currentBestAssignment.isEmpty()) { + LOG.warn( + "The current best possible assignment record is empty. Use the current states instead."); + currentBestAssignment = getCurrentStateAssingment(currentStateOutput, resources); + } + return currentBestAssignment; + } + + private Map getCurrentStateAssingment( + CurrentStateOutput currentStateOutput, Set resourceSet) { + Map currentStateAssignment = new HashMap<>(); + for (String resourceName : resourceSet) { + Map> currentStateMap = + currentStateOutput.getCurrentStateMap(resourceName); + if (!currentStateMap.isEmpty()) { + ResourceAssignment newResourceAssignment = new ResourceAssignment(resourceName); + currentStateMap.entrySet().stream().forEach(currentStateEntry -> { + newResourceAssignment + .addReplicaMap(currentStateEntry.getKey(), currentStateEntry.getValue()); + }); + currentStateAssignment.put(resourceName, newResourceAssignment); + } + } + return currentStateAssignment; + } } diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithm.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithm.java index 89a3f29662..de6f23c106 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithm.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithm.java @@ -29,6 +29,7 @@ import java.util.function.Function; import java.util.stream.Collectors; +import com.google.common.collect.Maps; import org.apache.helix.HelixRebalanceException; import org.apache.helix.controller.rebalancer.waged.RebalanceAlgorithm; import org.apache.helix.controller.rebalancer.waged.model.AssignableNode; @@ -36,11 +37,10 @@ import org.apache.helix.controller.rebalancer.waged.model.ClusterContext; import org.apache.helix.controller.rebalancer.waged.model.ClusterModel; import org.apache.helix.controller.rebalancer.waged.model.OptimalAssignment; +import org.apache.helix.model.ResourceAssignment; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.collect.Maps; - /** * The algorithm is based on a given set of constraints * - HardConstraint: Approve or deny the assignment given its condition, any assignment cannot @@ -64,29 +64,26 @@ class ConstraintBasedAlgorithm implements RebalanceAlgorithm { @Override public OptimalAssignment calculate(ClusterModel clusterModel) throws HelixRebalanceException { OptimalAssignment optimalAssignment = new OptimalAssignment(); - Map> replicasByResource = clusterModel.getAssignableReplicaMap(); List nodes = new ArrayList<>(clusterModel.getAssignableNodes().values()); - - // TODO: different orders of resource/replica could lead to different greedy assignments, will - // revisit and improve the performance - for (String resource : replicasByResource.keySet()) { - for (AssignableReplica replica : replicasByResource.get(resource)) { - Optional maybeBestNode = - getNodeWithHighestPoints(replica, nodes, clusterModel.getContext(), optimalAssignment); - // stop immediately if any replica cannot find best assignable node - if (optimalAssignment.hasAnyFailure()) { - String errorMessage = String.format( - "Unable to find any available candidate node for partition %s; Fail reasons: %s", - replica.getPartitionName(), optimalAssignment.getFailures()); - throw new HelixRebalanceException(errorMessage, - HelixRebalanceException.Type.FAILED_TO_CALCULATE); - } - maybeBestNode.ifPresent(node -> clusterModel.assign(replica.getResourceName(), - replica.getPartitionName(), replica.getReplicaState(), node.getInstanceName())); + // Sort the replicas so the input is stable for the greedy algorithm. + // For the other algorithm implementation, this sorting could be unnecessary. + for (AssignableReplica replica : getOrderedAssignableReplica(clusterModel)) { + Optional maybeBestNode = + getNodeWithHighestPoints(replica, nodes, clusterModel.getContext(), optimalAssignment); + // stop immediately if any replica cannot find best assignable node + if (optimalAssignment.hasAnyFailure()) { + String errorMessage = String.format( + "Unable to find any available candidate node for partition %s; Fail reasons: %s", + replica.getPartitionName(), optimalAssignment.getFailures()); + throw new HelixRebalanceException(errorMessage, + HelixRebalanceException.Type.FAILED_TO_CALCULATE); } + maybeBestNode.ifPresent(node -> clusterModel + .assign(replica.getResourceName(), replica.getPartitionName(), replica.getReplicaState(), + node.getInstanceName())); } - - return optimalAssignment.convertFrom(clusterModel); + optimalAssignment.updateAssignments(clusterModel); + return optimalAssignment; } private Optional getNodeWithHighestPoints(AssignableReplica replica, @@ -133,4 +130,54 @@ private List convertFailureReasons(List hardConstraints) return hardConstraints.stream().map(HardConstraint::getDescription) .collect(Collectors.toList()); } + + // TODO investigate better ways to sort replicas. One option is sorting based on the creation time. + private List getOrderedAssignableReplica(ClusterModel clusterModel) { + Map> replicasByResource = clusterModel.getAssignableReplicaMap(); + List orderedAssignableReplicas = + replicasByResource.values().stream().flatMap(replicas -> replicas.stream()) + .collect(Collectors.toList()); + + Map bestPossibleAssignment = + clusterModel.getContext().getBestPossibleAssignment(); + Map baselineAssignment = + clusterModel.getContext().getBaselineAssignment(); + + // 1. Sort according if the assignment exists in the best possible and/or baseline assignment + // 2. Sort according to the state priority. Note that prioritizing the top state is required. + // Or the greedy algorithm will unnecessarily shuffle the states between replicas. + // 3. Sort according to the resource/partition name. + orderedAssignableReplicas.sort((replica1, replica2) -> { + String resourceName1 = replica1.getResourceName(); + String resourceName2 = replica2.getResourceName(); + if (bestPossibleAssignment.containsKey(resourceName1) == bestPossibleAssignment + .containsKey(resourceName2)) { + if (baselineAssignment.containsKey(resourceName1) == baselineAssignment + .containsKey(resourceName2)) { + // If both assignment states have/not have the resource assignment the same, + // compare for additional dimensions. + int statePriority1 = replica1.getStatePriority(); + int statePriority2 = replica2.getStatePriority(); + if (statePriority1 == statePriority2) { + // If state prioritizes are the same, compare the names. + if (resourceName1.equals(resourceName2)) { + return replica1.getPartitionName().compareTo(replica2.getPartitionName()); + } else { + return resourceName1.compareTo(resourceName2); + } + } else { + // Note we shall prioritize the replica with a higher state priority + return statePriority2 - statePriority1; + } + } else { + // If the baseline assignment contains the assignment, prioritize the replica. + return baselineAssignment.containsKey(resourceName1) ? -1 : 1; + } + } else { + // If the best possible assignment contains the assignment, prioritize the replica. + return bestPossibleAssignment.containsKey(resourceName1) ? -1 : 1; + } + }); + return orderedAssignableReplicas; + } } diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/NodeMaxPartitionLimitConstraint.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/NodeMaxPartitionLimitConstraint.java index 9d0752be6c..cda53292f3 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/NodeMaxPartitionLimitConstraint.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/NodeMaxPartitionLimitConstraint.java @@ -28,9 +28,12 @@ class NodeMaxPartitionLimitConstraint extends HardConstraint { @Override boolean isAssignmentValid(AssignableNode node, AssignableReplica replica, ClusterContext clusterContext) { - return node.getAssignedReplicaCount() < node.getMaxPartition() - && node.getAssignedPartitionsByResource(replica.getResourceName()).size() < replica - .getResourceMaxPartitionsPerInstance(); + boolean exceedMaxPartitionLimit = + node.getMaxPartition() < 0 || node.getAssignedReplicaCount() < node.getMaxPartition(); + boolean exceedResourceMaxPartitionLimit = replica.getResourceMaxPartitionsPerInstance() < 0 + || node.getAssignedPartitionsByResource(replica.getResourceName()).size() < replica + .getResourceMaxPartitionsPerInstance(); + return exceedMaxPartitionLimit && exceedResourceMaxPartitionLimit; } @Override diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java index a3460fb5cb..20de6da401 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java @@ -19,8 +19,6 @@ * under the License. */ -import static java.lang.Math.max; - import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -287,16 +285,23 @@ public int getMaxPartition() { * Any missing field will cause an invalid topology config exception. */ private String computeFaultZone(ClusterConfig clusterConfig, InstanceConfig instanceConfig) { - if (clusterConfig.isTopologyAwareEnabled()) { - String topologyStr = clusterConfig.getTopology(); - String faultZoneType = clusterConfig.getFaultZoneType(); - if (topologyStr == null || faultZoneType == null) { - throw new HelixException("Fault zone or cluster topology information is not configured."); - } - + if (!clusterConfig.isTopologyAwareEnabled()) { + // Instance name is the default fault zone if topology awareness is false. + return instanceConfig.getInstanceName(); + } + String topologyStr = clusterConfig.getTopology(); + String faultZoneType = clusterConfig.getFaultZoneType(); + if (topologyStr == null || faultZoneType == null) { + LOG.debug("Topology configuration is not complete. Topology define: {}, Fault Zone Type: {}", + topologyStr, faultZoneType); + // Use the instance name, or the deprecated ZoneId field (if exists) as the default fault zone. + String zoneId = instanceConfig.getZoneId(); + return zoneId == null ? instanceConfig.getInstanceName() : zoneId; + } else { + // Get the fault zone information from the complete topology definition. String[] topologyDef = topologyStr.trim().split("/"); - if (topologyDef.length == 0 - || Arrays.stream(topologyDef).noneMatch(type -> type.equals(faultZoneType))) { + if (topologyDef.length == 0 || + Arrays.stream(topologyDef).noneMatch(type -> type.equals(faultZoneType))) { throw new HelixException( "The configured topology definition is empty or does not contain the fault zone type."); } @@ -324,10 +329,6 @@ private String computeFaultZone(ClusterConfig clusterConfig, InstanceConfig inst } return faultZoneStringBuilder.toString(); } - } else { - // For backward compatibility - String zoneId = instanceConfig.getZoneId(); - return zoneId == null ? instanceConfig.getInstanceName() : zoneId; } } @@ -356,7 +357,7 @@ private void updateCapacityAndUtilization(String capacityKey, int valueToSubtrac // For the purpose of constraint calculation, the max utilization cannot be larger than 100%. float utilization = Math.min( (float) (_maxCapacity.get(capacityKey) - newCapacity) / _maxCapacity.get(capacityKey), 1); - _highestCapacityUtilization = max(_highestCapacityUtilization, utilization); + _highestCapacityUtilization = Math.max(_highestCapacityUtilization, utilization); } // else if the capacityKey does not exist in the capacity map, this method essentially becomes // a NOP; in other words, this node will be treated as if it has unlimited capacity. @@ -394,4 +395,9 @@ public int hashCode() { public int compareTo(AssignableNode o) { return _instanceName.compareTo(o.getInstanceName()); } + + @Override + public String toString() { + return _instanceName; + } } diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableReplica.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableReplica.java index 66bd7b774e..a651e1924d 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableReplica.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableReplica.java @@ -20,17 +20,22 @@ */ import java.io.IOException; +import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.helix.model.ClusterConfig; import org.apache.helix.model.ResourceConfig; import org.apache.helix.model.StateModelDefinition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * This class represents a partition replication that needs to be allocated. */ public class AssignableReplica implements Comparable { + private static final Logger LOG = LoggerFactory.getLogger(AssignableReplica.class); + private final String _partitionName; private final String _resourceName; private final String _resourceInstanceGroupTag; @@ -149,9 +154,10 @@ private Map fetchCapacityUsage(String partitionName, partitionCapacity = capacityMap.get(ResourceConfig.DEFAULT_PARTITION_KEY); } if (partitionCapacity == null) { - throw new IllegalArgumentException(String.format( - "The capacity usage of the specified partition %s is not configured in the Resource Config %s. No default partition capacity is configured neither.", - partitionName, resourceConfig.getResourceName())); + LOG.warn("The capacity usage of the specified partition {} is not configured in the Resource" + + " Config {}. No default partition capacity is configured either. Will proceed with" + + " empty capacity configuration.", partitionName, resourceConfig.getResourceName()); + partitionCapacity = new HashMap<>(); } List requiredCapacityKeys = clusterConfig.getInstanceCapacityKeys(); diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java index af1a8d8d02..2b534220d3 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java @@ -200,6 +200,9 @@ private static Map> parseAllReplicas( for (String resourceName : resourceMap.keySet()) { ResourceConfig resourceConfig = dataProvider.getResourceConfig(resourceName); + if (resourceConfig == null) { + resourceConfig = new ResourceConfig(resourceName); + } IdealState is = dataProvider.getIdealState(resourceName); if (is == null) { throw new HelixException( @@ -223,6 +226,7 @@ private static Map> parseAllReplicas( for (Map.Entry entry : stateCountMap.entrySet()) { String state = entry.getKey(); for (int i = 0; i < entry.getValue(); i++) { + mergeIdealStateWithResourceConfig(resourceConfig, is); totalReplicaMap.computeIfAbsent(resourceName, key -> new HashSet<>()).add( new AssignableReplica(clusterConfig, resourceConfig, partition, state, def.getStatePriorityMap().get(state))); @@ -233,6 +237,32 @@ private static Map> parseAllReplicas( return totalReplicaMap; } + /** + * For backward compatibility, propagate the critical simple fields from the IdealState to + * the Resource Config. + * Eventually, Resource Config should be the only metadata node that contains the required information. + */ + private static void mergeIdealStateWithResourceConfig(ResourceConfig resourceConfig, + final IdealState idealState) { + // Note that the config fields get updated in this method shall be fully compatible with ones in the IdealState. + // 1. The fields shall have exactly the same meaning. + // 2. The value shall be exactly compatible, no additional calculation involved. + // 3. Resource Config items have a high priority. + // This is to ensure the resource config is not polluted after the merge. + if (null == resourceConfig.getRecord() + .getSimpleField(ResourceConfig.ResourceConfigProperty.INSTANCE_GROUP_TAG.name())) { + resourceConfig.getRecord() + .setSimpleField(ResourceConfig.ResourceConfigProperty.INSTANCE_GROUP_TAG.name(), + idealState.getInstanceGroupTag()); + } + if (null == resourceConfig.getRecord() + .getSimpleField(ResourceConfig.ResourceConfigProperty.MAX_PARTITIONS_PER_INSTANCE.name())) { + resourceConfig.getRecord() + .setIntField(ResourceConfig.ResourceConfigProperty.MAX_PARTITIONS_PER_INSTANCE.name(), + idealState.getMaxPartitionsPerInstance()); + } + } + /** * @return A map contains the assignments for each fault zone. > */ diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/OptimalAssignment.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/OptimalAssignment.java index 31cb1818b2..138f30c58e 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/OptimalAssignment.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/OptimalAssignment.java @@ -19,38 +19,64 @@ * under the License. */ +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.helix.HelixException; +import org.apache.helix.model.Partition; import org.apache.helix.model.ResourceAssignment; /** * The data model represents the optimal assignment of N replicas assigned to M instances; * It's mostly used as the return parameter of an assignment calculation algorithm; If the algorithm - * failed to find optimal assignment given the endeavor, the user could check the failure reasons + * failed to find optimal assignment given the endeavor, the user could check the failure reasons. + * Note that this class is not thread safe. */ public class OptimalAssignment { private Map> _optimalAssignment = new HashMap<>(); private Map>> _failedAssignments = new HashMap<>(); - public OptimalAssignment() { - - } - + /** + * Update the OptimalAssignment instance with the existing assignment recorded in the input cluster model. + * + * @param clusterModel + */ public void updateAssignments(ClusterModel clusterModel) { - + _optimalAssignment.clear(); + clusterModel.getAssignableNodes().values().stream() + .forEach(node -> _optimalAssignment.put(node, new ArrayList<>(node.getAssignedReplicas()))); } - // TODO: determine the output of final assignment format + /** + * @return The optimal assignment in the form of a map. + */ public Map getOptimalResourceAssignment() { - throw new UnsupportedOperationException("Not implemented yet"); - } - - // TODO: the convert method is not the best choice so far, will revisit the data model - public OptimalAssignment convertFrom(ClusterModel clusterModel) { - return this; + if (hasAnyFailure()) { + throw new HelixException( + "Cannot get the optimal resource assignment since a calculation failure is recorded. " + + getFailures()); + } + Map assignmentMap = new HashMap<>(); + for (AssignableNode node : _optimalAssignment.keySet()) { + for (AssignableReplica replica : _optimalAssignment.get(node)) { + String resourceName = replica.getResourceName(); + Partition partition = new Partition(replica.getPartitionName()); + ResourceAssignment resourceAssignment = assignmentMap + .computeIfAbsent(resourceName, key -> new ResourceAssignment(resourceName)); + Map partitionStateMap = resourceAssignment.getReplicaMap(partition); + if (partitionStateMap.isEmpty()) { + // ResourceAssignment returns immutable empty map while no such assignment recorded yet. + // So if the returned map is empty, create a new map. + partitionStateMap = new HashMap<>(); + } + partitionStateMap.put(node.getInstanceName(), replica.getReplicaState()); + resourceAssignment.addReplicaMap(partition, partitionStateMap); + } + } + return assignmentMap; } public void recordAssignmentFailure(AssignableReplica replica, diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java index 4df8e8dde2..452631ca8c 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java @@ -19,6 +19,14 @@ * under the License. */ +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.stream.Collectors; + import org.apache.helix.HelixException; import org.apache.helix.HelixManager; import org.apache.helix.HelixRebalanceException; @@ -48,13 +56,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.concurrent.Callable; - /** * For partition compute best possible (instance,state) pair based on * IdealState,StateModel,LiveInstance @@ -114,67 +115,46 @@ private BestPossibleStateOutput compute(ClusterEvent event, Map newIdealStates = new HashMap<>(); - Map preferences = cache.getClusterConfig() - .getGlobalRebalancePreference(); - WagedRebalancer wagedRebalancer = new WagedRebalancer(helixManager, preferences); - try { - newIdealStates - .putAll(wagedRebalancer.computeNewIdealStates(cache, resourceMap, currentStateOutput)); - } catch (HelixRebalanceException ex) { - // Note that unlike the legacy rebalancer, the WAGED rebalance won't return partial result. - // Since it calculates for all the eligible resources globally, a partial result is invalid. - // TODO propagate the rebalancer failure information to updateRebalanceStatus for monitoring. - LogUtil.logError(logger, _eventId, String - .format("Failed to calculate the new Ideal States using the rebalancer %s due to %s", - wagedRebalancer.getClass().getSimpleName(), ex.getFailureType()), ex); - } + boolean isValid = + validateOfflineInstancesLimit(cache, event.getAttribute(AttributeName.helixmanager.name())); final List failureResources = new ArrayList<>(); - Iterator itr = resourceMap.values().iterator(); + + Map calculatedResourceMap = + computeResourceBestPossibleStateWithWagedRebalancer(cache, currentStateOutput, helixManager, + resourceMap, output, failureResources); + + Map remainingResourceMap = new HashMap<>(resourceMap); + remainingResourceMap.keySet().removeAll(calculatedResourceMap.keySet()); + + // Fallback to the original single resource rebalancer calculation. + // This is required because we support mixed cluster that uses both WAGED rebalancer and the + // older rebalancers. + Iterator itr = remainingResourceMap.values().iterator(); while (itr.hasNext()) { Resource resource = itr.next(); boolean result = false; - IdealState is = newIdealStates.get(resource.getResourceName()); - if (is != null) { - // 2. Check if the WAGED rebalancer has calculated for this resource or not. - result = checkBestPossibleStateCalculation(is); - if (result) { - // The WAGED rebalancer calculates a valid result, record in the output - updateBestPossibleStateOutput(output, resource, is); - } - } else { - // 3. The WAGED rebalancer skips calculating the resource assignment, fallback to use a - // legacy resource rebalancer if applicable. - // If this calculation fails, the resource will be reported in the failureResources list. - try { - result = - computeSingleResourceBestPossibleState(event, cache, currentStateOutput, resource, - output); - } catch (HelixException ex) { - LogUtil.logError(logger, _eventId, - "Exception when calculating best possible states for " + resource.getResourceName(), - ex); - } + try { + result = computeSingleResourceBestPossibleState(event, cache, currentStateOutput, resource, + output); + } catch (HelixException ex) { + LogUtil.logError(logger, _eventId, String + .format("Exception when calculating best possible states for %s", + resource.getResourceName()), ex); + } if (!result) { failureResources.add(resource.getResourceName()); - LogUtil.logWarn(logger, _eventId, - "Failed to calculate best possible states for " + resource.getResourceName()); + LogUtil.logWarn(logger, _eventId, String + .format("Failed to calculate best possible states for %s", resource.getResourceName())); } } // Check and report if resource rebalance has failure updateRebalanceStatus(!isValid || !failureResources.isEmpty(), failureResources, helixManager, - cache, clusterStatusMonitor, - "Failed to calculate best possible states for " + failureResources.size() + " resources."); + cache, clusterStatusMonitor, String + .format("Failed to calculate best possible states for %d resources.", + failureResources.size())); return output; } @@ -238,6 +218,69 @@ private boolean validateOfflineInstancesLimit(final ResourceControllerDataProvid return true; } + /** + * Rebalance with the WAGED rebalancer + * The rebalancer only calculates the new ideal assignment for all the resources that are + * configured to use the WAGED rebalancer. + * + * @param cache Cluster data cache. + * @param currentStateOutput The current state information. + * @param helixManager + * @param resourceMap The complete resource map. The method will filter the map for the compatible resources. + * @param output The best possible state output. + * @param failureResources The failure records that will be updated if any resource cannot be computed. + * @return The map of all the calculated resources. + */ + private Map computeResourceBestPossibleStateWithWagedRebalancer( + ResourceControllerDataProvider cache, CurrentStateOutput currentStateOutput, + HelixManager helixManager, Map resourceMap, BestPossibleStateOutput output, + List failureResources) { + // Find the compatible resources: 1. FULL_AUTO 2. Configured to use the WAGED rebalancer + Map wagedRebalancedResourceMap = + resourceMap.entrySet().stream().filter(resourceEntry -> { + IdealState is = cache.getIdealState(resourceEntry.getKey()); + return is != null && is.getRebalanceMode().equals(IdealState.RebalanceMode.FULL_AUTO) + && WagedRebalancer.class.getName().equals(is.getRebalancerClassName()); + }).collect(Collectors.toMap(resourceEntry -> resourceEntry.getKey(), + resourceEntry -> resourceEntry.getValue())); + + Map newIdealStates = new HashMap<>(); + + // Init rebalancer with the rebalance preferences. + Map preferences = cache.getClusterConfig() + .getGlobalRebalancePreference(); + // TODO avoid creating the rebalancer on every rebalance call for performance enhancement + WagedRebalancer wagedRebalancer = new WagedRebalancer(helixManager, preferences); + try { + newIdealStates.putAll(wagedRebalancer + .computeNewIdealStates(cache, wagedRebalancedResourceMap, currentStateOutput)); + } catch (HelixRebalanceException ex) { + // Note that unlike the legacy rebalancer, the WAGED rebalance won't return partial result. + // Since it calculates for all the eligible resources globally, a partial result is invalid. + // TODO propagate the rebalancer failure information to updateRebalanceStatus for monitoring. + LogUtil.logError(logger, _eventId, String + .format("Failed to calculate the new Ideal States using the rebalancer %s due to %s", + wagedRebalancer.getClass().getSimpleName(), ex.getFailureType()), ex); + } finally { + wagedRebalancer.close(); + } + Iterator itr = wagedRebalancedResourceMap.values().iterator(); + while (itr.hasNext()) { + Resource resource = itr.next(); + IdealState is = newIdealStates.get(resource.getResourceName()); + // Check if the WAGED rebalancer has calculated the result for this resource or not. + if (is != null && checkBestPossibleStateCalculation(is)) { + // The WAGED rebalancer calculates a valid result, record in the output + updateBestPossibleStateOutput(output, resource, is); + } else { + failureResources.add(resource.getResourceName()); + LogUtil.logWarn(logger, _eventId, + "Failed to calculate best possible states for " + resource.getResourceName()); + } + } + return wagedRebalancedResourceMap; + } + private void updateBestPossibleStateOutput(BestPossibleStateOutput output, Resource resource, IdealState computedIdealState) { output.setPreferenceLists(resource.getResourceName(), computedIdealState.getPreferenceLists()); diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java index 24c7c8e93b..a11da29f0e 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java @@ -25,6 +25,7 @@ import java.util.Arrays; import java.util.List; import org.I0Itec.zkclient.exception.ZkMarshallingError; +import org.I0Itec.zkclient.exception.ZkNoNodeException; import org.I0Itec.zkclient.serialize.ZkSerializer; import org.apache.helix.AccessOption; import org.apache.helix.BaseDataAccessor; @@ -215,7 +216,7 @@ private HelixProperty compressedBucketRead(String path) { ZNRecord metadataRecord = _znRecordBaseDataAccessor.get(path, null, AccessOption.PERSISTENT); if (metadataRecord == null) { - throw new HelixException( + throw new ZkNoNodeException( String.format("Metadata ZNRecord does not exist for path: %s", path)); } diff --git a/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java b/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java index 03338b4894..b9284b9a23 100644 --- a/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java +++ b/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java @@ -30,9 +30,9 @@ import java.util.Map; import java.util.Set; import java.util.logging.Level; - import javax.management.MBeanServerConnection; import javax.management.ObjectName; + import org.I0Itec.zkclient.IZkStateListener; import org.I0Itec.zkclient.ZkServer; import org.apache.helix.BaseDataAccessor; @@ -54,6 +54,7 @@ import org.apache.helix.controller.pipeline.StageContext; import org.apache.helix.controller.rebalancer.DelayedAutoRebalancer; import org.apache.helix.controller.rebalancer.strategy.AutoRebalanceStrategy; +import org.apache.helix.controller.rebalancer.waged.WagedRebalancer; import org.apache.helix.controller.stages.AttributeName; import org.apache.helix.controller.stages.ClusterEvent; import org.apache.helix.manager.zk.ZKHelixAdmin; @@ -347,6 +348,19 @@ protected IdealState createResourceWithDelayedRebalance(String clusterName, Stri protected IdealState createResourceWithDelayedRebalance(String clusterName, String db, String stateModel, int numPartition, int replica, int minActiveReplica, long delay, String rebalanceStrategy) { + return createResource(clusterName, db, stateModel, numPartition, replica, minActiveReplica, + delay, DelayedAutoRebalancer.class.getName(), rebalanceStrategy); + } + + protected IdealState createResourceWithWagedRebalance(String clusterName, String db, + String stateModel, int numPartition, int replica, int minActiveReplica, long delay) { + return createResource(clusterName, db, stateModel, numPartition, replica, minActiveReplica, + delay, WagedRebalancer.class.getName(), null); + } + + private IdealState createResource(String clusterName, String db, String stateModel, + int numPartition, int replica, int minActiveReplica, long delay, String rebalancerClassName, + String rebalanceStrategy) { IdealState idealState = _gSetupTool.getClusterManagementTool().getResourceIdealState(clusterName, db); if (idealState == null) { @@ -362,7 +376,7 @@ protected IdealState createResourceWithDelayedRebalance(String clusterName, Stri if (delay > 0) { idealState.setRebalanceDelay(delay); } - idealState.setRebalancerClassName(DelayedAutoRebalancer.class.getName()); + idealState.setRebalancerClassName(rebalancerClassName); _gSetupTool.getClusterManagementTool().setResourceIdealState(clusterName, db, idealState); _gSetupTool.rebalanceStorageCluster(clusterName, db, replica); idealState = _gSetupTool.getClusterManagementTool().getResourceIdealState(clusterName, db); diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/MockAssignmentMetadataStore.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/MockAssignmentMetadataStore.java index 3371c8b4db..7d054160b3 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/MockAssignmentMetadataStore.java +++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/MockAssignmentMetadataStore.java @@ -23,6 +23,7 @@ import java.util.Map; import org.apache.helix.BucketDataAccessor; import org.apache.helix.model.ResourceAssignment; +import org.mockito.Mockito; /** * A mock up metadata store for unit test. @@ -32,8 +33,8 @@ public class MockAssignmentMetadataStore extends AssignmentMetadataStore { private Map _persistGlobalBaseline = new HashMap<>(); private Map _persistBestPossibleAssignment = new HashMap<>(); - MockAssignmentMetadataStore(BucketDataAccessor bucketDataAccessor, String clusterName) { - super(bucketDataAccessor, clusterName); + MockAssignmentMetadataStore() { + super(Mockito.mock(BucketDataAccessor.class), ""); } public Map getBaseline() { @@ -53,6 +54,10 @@ public void persistBestPossibleAssignment( _persistBestPossibleAssignment = bestPossibleAssignment; } + public void close() { + // do nothing + } + public void clearMetadataStore() { _persistBestPossibleAssignment.clear(); _persistGlobalBaseline.clear(); diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestAssignmentMetadataStore.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestAssignmentMetadataStore.java index 922915fdf4..ecd2af3769 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestAssignmentMetadataStore.java +++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestAssignmentMetadataStore.java @@ -20,6 +20,7 @@ */ import java.util.Map; + import org.apache.helix.HelixManager; import org.apache.helix.HelixManagerFactory; import org.apache.helix.InstanceType; @@ -28,6 +29,7 @@ import org.apache.helix.integration.manager.MockParticipantManager; import org.apache.helix.model.ResourceAssignment; import org.testng.Assert; +import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; @@ -79,7 +81,15 @@ public void beforeClass() throws Exception { _manager.connect(); // create AssignmentMetadataStore - _store = new AssignmentMetadataStore(_manager); + _store = new AssignmentMetadataStore(_manager.getMetadataStoreConnectionString(), + _manager.getClusterName()); + } + + @AfterClass + public void afterClass() { + if (_store != null) { + _store.close(); + } } /** @@ -91,11 +101,7 @@ public void beforeClass() throws Exception { */ @Test public void testReadEmptyBaseline() { - try { - Map baseline = _store.getBaseline(); - Assert.fail("Should fail because there shouldn't be any data."); - } catch (Exception e) { - // OK - } + Map baseline = _store.getBaseline(); + Assert.assertTrue(baseline.isEmpty()); } } diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java index d6fd99bf54..e7368be03e 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java +++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java @@ -26,7 +26,7 @@ import java.util.Map; import java.util.Set; import java.util.stream.Collectors; -import org.apache.helix.BucketDataAccessor; + import org.apache.helix.HelixConstants; import org.apache.helix.HelixRebalanceException; import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider; @@ -47,8 +47,9 @@ import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; -import static org.mockito.Matchers.*; -import static org.mockito.Mockito.*; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.when; public class TestWagedRebalancer extends AbstractTestClusterModel { private Set _instances; @@ -63,9 +64,7 @@ public void initialize() { _algorithm = new MockRebalanceAlgorithm(); // Initialize a mock assignment metadata store - BucketDataAccessor mockAccessor = Mockito.mock(BucketDataAccessor.class); - String clusterName = ""; // an empty string for testing purposes - _metadataStore = new MockAssignmentMetadataStore(mockAccessor, clusterName); + _metadataStore = new MockAssignmentMetadataStore(); } @Override @@ -181,9 +180,9 @@ public void testRebalanceWithCurrentState() throws IOException, HelixRebalanceEx String resourceName = csEntry.getKey(); CurrentState cs = csEntry.getValue(); for (Map.Entry partitionStateEntry : cs.getPartitionStateMap().entrySet()) { - currentStateOutput.setCurrentState(resourceName, - new Partition(partitionStateEntry.getKey()), instanceName, - partitionStateEntry.getValue()); + currentStateOutput + .setCurrentState(resourceName, new Partition(partitionStateEntry.getKey()), + instanceName, partitionStateEntry.getValue()); } } } @@ -216,7 +215,7 @@ public void testRebalanceWithCurrentState() throws IOException, HelixRebalanceEx "DROPPED"); } - @Test(dependsOnMethods = "testRebalance") + @Test(dependsOnMethods = "testRebalance", expectedExceptions = HelixRebalanceException.class, expectedExceptionsMessageRegExp = "Input contains invalid resource\\(s\\) that cannot be rebalanced by the WAGED rebalancer. \\[Resource1\\] Failure Type: INVALID_INPUT") public void testNonCompatibleConfiguration() throws IOException, HelixRebalanceException { _metadataStore.clearMetadataStore(); WagedRebalancer rebalancer = new WagedRebalancer(_metadataStore, _algorithm); @@ -233,12 +232,7 @@ public void testNonCompatibleConfiguration() throws IOException, HelixRebalanceE .forEach(partition -> resource.addPartition(partition)); return resource; })); - Map newIdealStates = - rebalancer.computeNewIdealStates(clusterData, resourceMap, new CurrentStateOutput()); - Map algorithmResult = _algorithm.getRebalanceResult(); - // The output shall not contains the nonCompatibleResource. - resourceMap.remove(nonCompatibleResourceName); - validateRebalanceResult(resourceMap, newIdealStates, algorithmResult); + rebalancer.computeNewIdealStates(clusterData, resourceMap, new CurrentStateOutput()); } // TODO test with invalid capacity configuration which will fail the cluster model constructing. @@ -283,7 +277,7 @@ public void testInvalidRebalancerStatus() throws IOException { Assert.assertEquals(ex.getFailureType(), HelixRebalanceException.Type.INVALID_REBALANCER_STATUS); Assert.assertEquals(ex.getMessage(), - "Failed to get the persisted assignment records. Failure Type: INVALID_REBALANCER_STATUS"); + "Failed to get the current baseline assignment because of unexpected error. Failure Type: INVALID_REBALANCER_STATUS"); } } @@ -425,8 +419,9 @@ private void validateRebalanceResult(Map resourceMap, Assert.assertTrue(newIdealStates.containsKey(resourceName)); IdealState is = newIdealStates.get(resourceName); ResourceAssignment assignment = expectedResult.get(resourceName); - Assert.assertEquals(is.getPartitionSet(), new HashSet<>(assignment.getMappedPartitions() - .stream().map(partition -> partition.getPartitionName()).collect(Collectors.toSet()))); + Assert.assertEquals(is.getPartitionSet(), new HashSet<>( + assignment.getMappedPartitions().stream().map(partition -> partition.getPartitionName()) + .collect(Collectors.toSet()))); for (String partitionName : is.getPartitionSet()) { Assert.assertEquals(is.getInstanceStateMap(partitionName), assignment.getReplicaMap(new Partition(partitionName))); diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/MockRebalanceAlgorithm.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/MockRebalanceAlgorithm.java index 2a39482076..759c6857f0 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/MockRebalanceAlgorithm.java +++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/MockRebalanceAlgorithm.java @@ -72,7 +72,7 @@ public OptimalAssignment calculate(ClusterModel clusterModel) { _resultHistory = result; - // TODO remove this mockito when OptimalAssignment.getOptimalResourceAssignment is ready. + // Mock the return value for supporting test. OptimalAssignment optimalAssignment = Mockito.mock(OptimalAssignment.class); when(optimalAssignment.getOptimalResourceAssignment()).thenReturn(result); return optimalAssignment; diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java index 0f799b36a2..91db0762b6 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java +++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java @@ -103,8 +103,8 @@ protected ResourceControllerDataProvider setupClusterDataCache() throws IOExcept ClusterConfig testClusterConfig = new ClusterConfig("testClusterConfigId"); testClusterConfig.setMaxPartitionsPerInstance(5); testClusterConfig.setDisabledInstances(Collections.emptyMap()); - testClusterConfig.setTopologyAwareEnabled(false); testClusterConfig.setInstanceCapacityKeys(new ArrayList<>(_capacityDataMap.keySet())); + testClusterConfig.setTopologyAwareEnabled(true); when(testCache.getClusterConfig()).thenReturn(testClusterConfig); // 3. Mock the live instance node for the default instance. diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestOptimalAssignment.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestOptimalAssignment.java new file mode 100644 index 0000000000..bd820a977c --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestOptimalAssignment.java @@ -0,0 +1,91 @@ +package org.apache.helix.controller.rebalancer.waged.model; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.Map; + +import org.apache.helix.HelixException; +import org.apache.helix.model.Partition; +import org.apache.helix.model.ResourceAssignment; +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +public class TestOptimalAssignment extends ClusterModelTestHelper { + + @BeforeClass + public void initialize() { + super.initialize(); + } + + @Test + public void testUpdateAssignment() throws IOException { + OptimalAssignment assignment = new OptimalAssignment(); + + // update with empty cluster model + assignment.updateAssignments(getDefaultClusterModel()); + Map optimalAssignmentMap = + assignment.getOptimalResourceAssignment(); + Assert.assertEquals(optimalAssignmentMap, Collections.emptyMap()); + + // update with valid assignment + ClusterModel model = getDefaultClusterModel(); + model.assign(_resourceNames.get(0), _partitionNames.get(1), "SLAVE", _testInstanceId); + model.assign(_resourceNames.get(0), _partitionNames.get(0), "MASTER", _testInstanceId); + assignment.updateAssignments(model); + optimalAssignmentMap = assignment.getOptimalResourceAssignment(); + Assert.assertEquals(optimalAssignmentMap.get(_resourceNames.get(0)).getMappedPartitions(), + Arrays + .asList(new Partition(_partitionNames.get(0)), new Partition(_partitionNames.get(1)))); + Assert.assertEquals(optimalAssignmentMap.get(_resourceNames.get(0)) + .getReplicaMap(new Partition(_partitionNames.get(1))), + Collections.singletonMap(_testInstanceId, "SLAVE")); + Assert.assertEquals(optimalAssignmentMap.get(_resourceNames.get(0)) + .getReplicaMap(new Partition(_partitionNames.get(0))), + Collections.singletonMap(_testInstanceId, "MASTER")); + } + + @Test(dependsOnMethods = "testUpdateAssignment") + public void TestAssignmentFailure() throws IOException { + OptimalAssignment assignment = new OptimalAssignment(); + ClusterModel model = getDefaultClusterModel(); + + // record failure + AssignableReplica targetFailureReplica = + model.getAssignableReplicaMap().get(_resourceNames.get(0)).iterator().next(); + AssignableNode targetFailureNode = model.getAssignableNodes().get(_testInstanceId); + assignment.recordAssignmentFailure(targetFailureReplica, Collections + .singletonMap(targetFailureNode, Collections.singletonList("Assignment Failure!"))); + + Assert.assertTrue(assignment.hasAnyFailure()); + + assignment.updateAssignments(getDefaultClusterModel()); + try { + assignment.getOptimalResourceAssignment(); + Assert.fail("Get optimal assignment shall fail because of the failure record."); + } catch (HelixException ex) { + Assert.assertTrue(ex.getMessage().startsWith( + "Cannot get the optimal resource assignment since a calculation failure is recorded.")); + } + } +} diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java new file mode 100644 index 0000000000..67de90a376 --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java @@ -0,0 +1,476 @@ +package org.apache.helix.integration.rebalancer.WagedRebalancer; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.helix.ConfigAccessor; +import org.apache.helix.TestHelper; +import org.apache.helix.common.ZkTestBase; +import org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy; +import org.apache.helix.controller.rebalancer.strategy.CrushRebalanceStrategy; +import org.apache.helix.integration.manager.ClusterControllerManager; +import org.apache.helix.integration.manager.MockParticipantManager; +import org.apache.helix.model.BuiltInStateModelDefinitions; +import org.apache.helix.model.ClusterConfig; +import org.apache.helix.model.ExternalView; +import org.apache.helix.model.IdealState; +import org.apache.helix.model.InstanceConfig; +import org.apache.helix.tools.ClusterVerifiers.HelixClusterVerifier; +import org.apache.helix.tools.ClusterVerifiers.StrictMatchExternalViewVerifier; +import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +public class TestWagedRebalance extends ZkTestBase { + protected final int NUM_NODE = 6; + protected static final int START_PORT = 12918; + protected static final int PARTITIONS = 20; + protected static final int TAGS = 2; + + protected final String CLASS_NAME = getShortClassName(); + protected final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME; + protected ClusterControllerManager _controller; + + List _participants = new ArrayList<>(); + Map _nodeToTagMap = new HashMap<>(); + List _nodes = new ArrayList<>(); + private Set _allDBs = new HashSet<>(); + private int _replica = 3; + + private static String[] _testModels = { BuiltInStateModelDefinitions.OnlineOffline.name(), + BuiltInStateModelDefinitions.MasterSlave.name(), + BuiltInStateModelDefinitions.LeaderStandby.name() + }; + + @BeforeClass + public void beforeClass() throws Exception { + System.out.println("START " + CLASS_NAME + " at " + new Date(System.currentTimeMillis())); + + _gSetupTool.addCluster(CLUSTER_NAME, true); + + for (int i = 0; i < NUM_NODE; i++) { + String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i); + addInstanceConfig(storageNodeName, i, TAGS); + } + + // start dummy participants + for (String node : _nodes) { + MockParticipantManager participant = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, node); + participant.syncStart(); + _participants.add(participant); + } + + // start controller + String controllerName = CONTROLLER_PREFIX + "_0"; + _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName); + _controller.syncStart(); + + enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true); + } + + protected void addInstanceConfig(String storageNodeName, int seqNo, int tagCount) { + _gSetupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName); + String tag = "tag-" + seqNo % tagCount; + _gSetupTool.getClusterManagementTool().addInstanceTag(CLUSTER_NAME, storageNodeName, tag); + _nodeToTagMap.put(storageNodeName, tag); + _nodes.add(storageNodeName); + } + + @Test + public void test() throws Exception { + int i = 0; + for (String stateModel : _testModels) { + String db = "Test-DB-" + i++; + createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica, _replica, + -1); + _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica); + _allDBs.add(db); + } + Thread.sleep(300); + + validate(_replica); + + // Adding 3 more resources + i = 0; + for (String stateModel : _testModels) { + String moreDB = "More-Test-DB-" + i++; + createResourceWithWagedRebalance(CLUSTER_NAME, moreDB, stateModel, PARTITIONS, _replica, + _replica, -1); + _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, moreDB, _replica); + _allDBs.add(moreDB); + + Thread.sleep(300); + + validate(_replica); + } + + // Drop the 3 additional resources + for (int j = 0; j < 3; j++) { + String moreDB = "More-Test-DB-" + j++; + _gSetupTool.dropResourceFromCluster(CLUSTER_NAME, moreDB); + _allDBs.remove(moreDB); + + Thread.sleep(300); + + validate(_replica); + } + } + + @Test(dependsOnMethods = "test") + public void testWithInstanceTag() throws Exception { + Set tags = new HashSet(_nodeToTagMap.values()); + int i = 3; + for (String tag : tags) { + String db = "Test-DB-" + i++; + createResourceWithWagedRebalance(CLUSTER_NAME, db, + BuiltInStateModelDefinitions.MasterSlave.name(), PARTITIONS, _replica, _replica, -1); + IdealState is = + _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); + is.setInstanceGroupTag(tag); + _gSetupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, db, is); + _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica); + _allDBs.add(db); + } + Thread.sleep(300); + validate(_replica); + } + + @Test(dependsOnMethods = "test") + public void testChangeIdealState() throws InterruptedException { + String dbName = "Test-DB"; + createResourceWithWagedRebalance(CLUSTER_NAME, dbName, + BuiltInStateModelDefinitions.MasterSlave.name(), PARTITIONS, _replica, _replica, -1); + _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, dbName, _replica); + _allDBs.add(dbName); + Thread.sleep(300); + + validate(_replica); + + // Adjust the replica count + IdealState is = + _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, dbName); + int newReplicaFactor = _replica - 1; + is.setReplicas("" + newReplicaFactor); + _gSetupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, dbName, is); + Thread.sleep(300); + + validate(newReplicaFactor); + + // Adjust the partition list + is = _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, dbName); + is.setNumPartitions(PARTITIONS + 1); + _gSetupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, dbName, is); + _gSetupTool.getClusterManagementTool().rebalance(CLUSTER_NAME, dbName, newReplicaFactor); + Thread.sleep(300); + + validate(newReplicaFactor); + ExternalView ev = + _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, dbName); + Assert.assertEquals(ev.getPartitionSet().size(), PARTITIONS + 1); + } + + @Test(dependsOnMethods = "test") + public void testDisableInstance() throws InterruptedException { + String dbName = "Test-DB"; + createResourceWithWagedRebalance(CLUSTER_NAME, dbName, + BuiltInStateModelDefinitions.MasterSlave.name(), PARTITIONS, _replica, _replica, -1); + _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, dbName, _replica); + _allDBs.add(dbName); + Thread.sleep(300); + + validate(_replica); + + // Disable participants, keep only three left + Set disableParticipants = new HashSet<>(); + + try { + for (int i = 3; i < _participants.size(); i++) { + MockParticipantManager p = _participants.get(i); + disableParticipants.add(p.getInstanceName()); + InstanceConfig config = _gSetupTool.getClusterManagementTool() + .getInstanceConfig(CLUSTER_NAME, p.getInstanceName()); + config.setInstanceEnabled(false); + _gSetupTool.getClusterManagementTool() + .setInstanceConfig(CLUSTER_NAME, p.getInstanceName(), config); + } + Thread.sleep(300); + + validate(_replica); + + // Verify there is no assignment on the disabled participants. + ExternalView ev = + _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, dbName); + for (String partition : ev.getPartitionSet()) { + Map replicaStateMap = ev.getStateMap(partition); + for (String instance : replicaStateMap.keySet()) { + Assert.assertFalse(disableParticipants.contains(instance)); + } + } + } finally { + // recover the config + for (String instanceName : disableParticipants) { + InstanceConfig config = + _gSetupTool.getClusterManagementTool().getInstanceConfig(CLUSTER_NAME, instanceName); + config.setInstanceEnabled(true); + _gSetupTool.getClusterManagementTool() + .setInstanceConfig(CLUSTER_NAME, instanceName, config); + } + } + } + + @Test(dependsOnMethods = "testDisableInstance") + public void testLackEnoughLiveInstances() throws Exception { + // shutdown participants, keep only two left + for (int i = 2; i < _participants.size(); i++) { + _participants.get(i).syncStop(); + } + + int j = 0; + for (String stateModel : _testModels) { + String db = "Test-DB-" + j++; + createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica, _replica, + -1); + _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica); + _allDBs.add(db); + } + + Thread.sleep(300); + // Verify if the partitions get assigned + validate(2); + + // restart the participants within the zone + for (int i = 2; i < _participants.size(); i++) { + MockParticipantManager p = _participants.get(i); + MockParticipantManager newNode = + new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, p.getInstanceName()); + _participants.set(i, newNode); + newNode.syncStart(); + } + + Thread.sleep(300); + // Verify if the partitions get assigned + validate(_replica); + } + + @Test(dependsOnMethods = "testDisableInstance") + public void testLackEnoughInstances() throws Exception { + // shutdown participants, keep only two left + for (int i = 2; i < _participants.size(); i++) { + MockParticipantManager p = _participants.get(i); + p.syncStop(); + _gSetupTool.getClusterManagementTool() + .enableInstance(CLUSTER_NAME, p.getInstanceName(), false); + _gSetupTool.dropInstanceFromCluster(CLUSTER_NAME, p.getInstanceName()); + + } + + int j = 0; + for (String stateModel : _testModels) { + String db = "Test-DB-" + j++; + createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica, _replica, + -1); + _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica); + _allDBs.add(db); + } + + Thread.sleep(300); + // Verify if the partitions get assigned + validate(2); + + // Create new participants within the zone + for (int i = 2; i < _participants.size(); i++) { + MockParticipantManager p = _participants.get(i); + String replaceNodeName = p.getInstanceName() + "-replacement_" + START_PORT; + addInstanceConfig(replaceNodeName, i, TAGS); + MockParticipantManager newNode = + new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, replaceNodeName); + _participants.set(i, newNode); + newNode.syncStart(); + } + + Thread.sleep(300); + // Verify if the partitions get assigned + validate(_replica); + } + + @Test(dependsOnMethods = "test") + public void testMixedRebalancerUsage() throws InterruptedException { + int i = 0; + for (String stateModel : _testModels) { + String db = "Test-DB-" + i++; + if (i == 0) { + _gSetupTool.addResourceToCluster(CLUSTER_NAME, db, PARTITIONS, stateModel, + IdealState.RebalanceMode.FULL_AUTO + "", CrushRebalanceStrategy.class.getName()); + } else if (i == 1) { + _gSetupTool.addResourceToCluster(CLUSTER_NAME, db, PARTITIONS, stateModel, + IdealState.RebalanceMode.FULL_AUTO + "", CrushEdRebalanceStrategy.class.getName()); + } else { + createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica, + _replica, -1); + } + _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica); + _allDBs.add(db); + } + Thread.sleep(300); + + validate(_replica); + } + + @Test(dependsOnMethods = "test") + public void testMaxPartitionLimitation() throws Exception { + ConfigAccessor configAccessor = new ConfigAccessor(_gZkClient); + ClusterConfig clusterConfig = configAccessor.getClusterConfig(CLUSTER_NAME); + // Change the cluster level config so no assignment can be done + clusterConfig.setMaxPartitionsPerInstance(1); + configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig); + try { + String limitedResourceName = null; + int i = 0; + for (String stateModel : _testModels) { + String db = "Test-DB-" + i++; + createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica, + _replica, -1); + if (i == 1) { + // The limited resource has additional limitation, so even the other resources can be assigned + // later, this resource will still be blocked by the max partition limitation. + limitedResourceName = db; + IdealState idealState = + _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); + idealState.setMaxPartitionsPerInstance(1); + _gSetupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, db, idealState); + } + _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica); + _allDBs.add(db); + } + Thread.sleep(300); + + // Since the WAGED rebalancer does not do partial rebalance, the initial assignment won't show. + Assert.assertFalse(TestHelper.verify(() -> _allDBs.stream().allMatch(db -> { + ExternalView ev = + _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); + return ev != null && !ev.getPartitionSet().isEmpty(); + }), 2000)); + + // Remove the cluster level limitation + clusterConfig.setMaxPartitionsPerInstance(-1); + configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig); + Thread.sleep(300); + + // wait until any of the resources is rebalanced + TestHelper.verify(() -> { + for (String db : _allDBs) { + ExternalView ev = + _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); + if (ev != null && !ev.getPartitionSet().isEmpty()) { + return true; + } + } + return false; + }, 3000); + ExternalView ev = _gSetupTool.getClusterManagementTool() + .getResourceExternalView(CLUSTER_NAME, limitedResourceName); + Assert.assertFalse(ev != null && !ev.getPartitionSet().isEmpty()); + + // Remove the resource level limitation + IdealState idealState = _gSetupTool.getClusterManagementTool() + .getResourceIdealState(CLUSTER_NAME, limitedResourceName); + idealState.setMaxPartitionsPerInstance(Integer.MAX_VALUE); + _gSetupTool.getClusterManagementTool() + .setResourceIdealState(CLUSTER_NAME, limitedResourceName, idealState); + + validate(_replica); + } finally { + // recover the config change + clusterConfig = configAccessor.getClusterConfig(CLUSTER_NAME); + clusterConfig.setMaxPartitionsPerInstance(-1); + configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig); + } + } + + private void validate(int expectedReplica) { + HelixClusterVerifier _clusterVerifier = + new StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR) + .setResources(_allDBs).build(); + Assert.assertTrue(_clusterVerifier.verify(5000)); + for (String db : _allDBs) { + IdealState is = + _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); + ExternalView ev = + _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); + validateIsolation(is, ev, expectedReplica); + } + } + + /** + * Validate each partition is different instances and with necessary tagged instances. + */ + private void validateIsolation(IdealState is, ExternalView ev, int expectedReplica) { + String tag = is.getInstanceGroupTag(); + for (String partition : is.getPartitionSet()) { + Map assignmentMap = ev.getRecord().getMapField(partition); + Set instancesInEV = assignmentMap.keySet(); + Assert.assertEquals(instancesInEV.size(), expectedReplica); + for (String instance : instancesInEV) { + if (tag != null) { + InstanceConfig config = + _gSetupTool.getClusterManagementTool().getInstanceConfig(CLUSTER_NAME, instance); + Assert.assertTrue(config.containsTag(tag)); + } + } + } + } + + @AfterMethod + public void afterMethod() throws Exception { + for (String db : _allDBs) { + _gSetupTool.dropResourceFromCluster(CLUSTER_NAME, db); + } + _allDBs.clear(); + // waiting for all DB be dropped. + Thread.sleep(100); + ZkHelixClusterVerifier _clusterVerifier = + new StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR) + .setResources(_allDBs).build(); + Assert.assertTrue(_clusterVerifier.verifyByPolling()); + } + + @AfterClass + public void afterClass() throws Exception { + if (_controller != null && _controller.isConnected()) { + _controller.syncStop(); + } + for (MockParticipantManager p : _participants) { + if (p != null && p.isConnected()) { + p.syncStop(); + } + } + deleteCluster(CLUSTER_NAME); + } +} diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalanceFaultZone.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalanceFaultZone.java new file mode 100644 index 0000000000..c8d97d098c --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalanceFaultZone.java @@ -0,0 +1,405 @@ +package org.apache.helix.integration.rebalancer.WagedRebalancer; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.helix.TestHelper; +import org.apache.helix.common.ZkTestBase; +import org.apache.helix.integration.manager.ClusterControllerManager; +import org.apache.helix.integration.manager.MockParticipantManager; +import org.apache.helix.model.BuiltInStateModelDefinitions; +import org.apache.helix.model.ExternalView; +import org.apache.helix.model.IdealState; +import org.apache.helix.model.InstanceConfig; +import org.apache.helix.tools.ClusterVerifiers.StrictMatchExternalViewVerifier; +import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +public class TestWagedRebalanceFaultZone extends ZkTestBase { + protected final int NUM_NODE = 6; + protected static final int START_PORT = 12918; + protected static final int PARTITIONS = 20; + protected static final int ZONES = 3; + protected static final int TAGS = 2; + + protected final String CLASS_NAME = getShortClassName(); + protected final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME; + protected ClusterControllerManager _controller; + + List _participants = new ArrayList<>(); + Map _nodeToZoneMap = new HashMap<>(); + Map _nodeToTagMap = new HashMap<>(); + List _nodes = new ArrayList<>(); + Set _allDBs = new HashSet<>(); + int _replica = 3; + + String[] _testModels = { + BuiltInStateModelDefinitions.OnlineOffline.name(), + BuiltInStateModelDefinitions.MasterSlave.name(), + BuiltInStateModelDefinitions.LeaderStandby.name() + }; + + @BeforeClass + public void beforeClass() throws Exception { + System.out.println("START " + CLASS_NAME + " at " + new Date(System.currentTimeMillis())); + + _gSetupTool.addCluster(CLUSTER_NAME, true); + + for (int i = 0; i < NUM_NODE; i++) { + String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i); + addInstanceConfig(storageNodeName, i, ZONES, TAGS); + } + + // start dummy participants + for (String node : _nodes) { + MockParticipantManager participant = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, node); + participant.syncStart(); + _participants.add(participant); + } + + // start controller + String controllerName = CONTROLLER_PREFIX + "_0"; + _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName); + _controller.syncStart(); + + enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true); + enableTopologyAwareRebalance(_gZkClient, CLUSTER_NAME, true); + } + + protected void addInstanceConfig(String storageNodeName, int seqNo, int zoneCount, int tagCount) { + _gSetupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName); + String zone = "zone-" + seqNo % zoneCount; + String tag = "tag-" + seqNo % tagCount; + _gSetupTool.getClusterManagementTool().setInstanceZoneId(CLUSTER_NAME, storageNodeName, zone); + _gSetupTool.getClusterManagementTool().addInstanceTag(CLUSTER_NAME, storageNodeName, tag); + _nodeToZoneMap.put(storageNodeName, zone); + _nodeToTagMap.put(storageNodeName, tag); + _nodes.add(storageNodeName); + } + + @Test + public void testZoneIsolation() throws Exception { + int i = 0; + for (String stateModel : _testModels) { + String db = "Test-DB-" + i++; + createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica, + _replica, -1); + _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica); + _allDBs.add(db); + } + Thread.sleep(300); + + validate(_replica); + } + + @Test + public void testZoneIsolationWithInstanceTag() throws Exception { + Set tags = new HashSet(_nodeToTagMap.values()); + int i = 0; + for (String tag : tags) { + String db = "Test-DB-" + i++; + createResourceWithWagedRebalance(CLUSTER_NAME, db, + BuiltInStateModelDefinitions.MasterSlave.name(), PARTITIONS, _replica, _replica, -1); + IdealState is = + _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); + is.setInstanceGroupTag(tag); + _gSetupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, db, is); + _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica); + _allDBs.add(db); + } + Thread.sleep(300); + + validate(_replica); + } + + @Test(dependsOnMethods = { "testZoneIsolation", "testZoneIsolationWithInstanceTag" }) + public void testLackEnoughLiveRacks() throws Exception { + // shutdown participants within one zone + String zone = _nodeToZoneMap.values().iterator().next(); + for (int i = 0; i < _participants.size(); i++) { + MockParticipantManager p = _participants.get(i); + if (_nodeToZoneMap.get(p.getInstanceName()).equals(zone)) { + p.syncStop(); + } + } + + int j = 0; + for (String stateModel : _testModels) { + String db = "Test-DB-" + j++; + createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica, + _replica, -1); + _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica); + _allDBs.add(db); + } + Thread.sleep(300); + // Since the WAGED rebalancer does not do partial rebalance, the initial assignment won't show. + Assert.assertFalse(TestHelper.verify(() -> _allDBs.stream().allMatch(db -> { + ExternalView ev = + _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); + return ev != null && !ev.getPartitionSet().isEmpty(); + }), 2000)); + + // restart the participants within the zone + for (int i = 0; i < _participants.size(); i++) { + MockParticipantManager p = _participants.get(i); + if (_nodeToZoneMap.get(p.getInstanceName()).equals(zone)) { + MockParticipantManager newNode = + new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, p.getInstanceName()); + _participants.set(i, newNode); + newNode.syncStart(); + } + } + + Thread.sleep(300); + // Verify if the partitions get assigned + validate(_replica); + + // TODO: Revisit the logic that the rebalancer handles fault zone. + // TODO: For now, if node is less than replica count, the rebalancer will adjust automatically. + // TODO: However, if the fault zone is less than replica count, calculation will fail. +/* for (int i = 0; i < _participants.size(); i++) { + MockParticipantManager p = _participants.get(i); + if (_nodeToZoneMap.get(p.getInstanceName()).equals(zone)) { + p.syncStop(); + } + } + + Thread.sleep(300); + + for (String db : _allDBs) { + IdealState is = + _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); + ExternalView ev = + _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); + validateZoneAndTagIsolation(is, ev, 2); + }*/ + } + + @Test(dependsOnMethods = { "testLackEnoughLiveRacks" }) + public void testLackEnoughRacks() throws Exception { + // shutdown participants within one zone + String zone = _nodeToZoneMap.values().iterator().next(); + for (int i = 0; i < _participants.size(); i++) { + MockParticipantManager p = _participants.get(i); + if (_nodeToZoneMap.get(p.getInstanceName()).equals(zone)) { + p.syncStop(); + _gSetupTool.getClusterManagementTool() + .enableInstance(CLUSTER_NAME, p.getInstanceName(), false); + Thread.sleep(50); + _gSetupTool.dropInstanceFromCluster(CLUSTER_NAME, p.getInstanceName()); + } + } + + int j = 0; + for (String stateModel : _testModels) { + String db = "Test-DB-" + j++; + createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica, + _replica, -1); + _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica); + _allDBs.add(db); + } + Thread.sleep(300); + // Since the WAGED rebalancer does not do partial rebalance, the initial assignment won't show. + Assert.assertFalse(TestHelper.verify(() -> _allDBs.stream().allMatch(db -> { + ExternalView ev = + _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); + return ev != null && !ev.getPartitionSet().isEmpty(); + }), 2000)); + + // Create new participants within the zone + int nodeCount = _participants.size(); + for (int i = 0; i < nodeCount; i++) { + MockParticipantManager p = _participants.get(i); + if (_nodeToZoneMap.get(p.getInstanceName()).equals(zone)) { + String replaceNodeName = p.getInstanceName() + "-replacement_" + START_PORT; + addInstanceConfig(replaceNodeName, i, ZONES, TAGS); + MockParticipantManager newNode = + new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, replaceNodeName); + _participants.set(i, newNode); + newNode.syncStart(); + } + } + + Thread.sleep(300); + // Verify if the partitions get assigned + validate(_replica); + + // TODO: Revisit the logic that the rebalancer handles fault zone. + // TODO: For now, if node is less than replica count, the rebalancer will adjust automatically. + // TODO: However, if the fault zone is less than replica count, calculation will fail. +/* for (int i = 0; i < _participants.size(); i++) { + MockParticipantManager p = _participants.get(i); + if (_nodeToZoneMap.get(p.getInstanceName()).equals(zone)) { + p.syncStop(); + } + } + + Thread.sleep(300); + + for (String db : _allDBs) { + IdealState is = + _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); + ExternalView ev = + _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); + validateZoneAndTagIsolation(is, ev, 2); + }*/ + } + + @Test(dependsOnMethods = { "testZoneIsolation", "testZoneIsolationWithInstanceTag" }) + public void testAddZone() throws Exception { + int i = 0; + for (String stateModel : _testModels) { + String db = "Test-DB-" + i++; + createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica, + _replica, -1); + _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica); + _allDBs.add(db); + } + Thread.sleep(300); + + validate(_replica); + + // Create new participants within the a new zone + Set newNodes = new HashSet<>(); + Map newNodeReplicaCount = new HashMap<>(); + + try { + int nodeCount = 2; + for (int j = 0; j < nodeCount; j++) { + String newNodeName = "new-zone-node-" + j + "_" + START_PORT; + // Add all new node to the new zone + addInstanceConfig(newNodeName, -1, ZONES + 1, TAGS); + MockParticipantManager newNode = + new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, newNodeName); + newNode.syncStart(); + newNodes.add(newNode); + newNodeReplicaCount.put(newNodeName, 0); + } + Thread.sleep(300); + + validate(_replica); + + // The new zone nodes shall have some assignments + for (String db : _allDBs) { + IdealState is = + _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); + ExternalView ev = + _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); + validateZoneAndTagIsolation(is, ev, _replica); + for (String partition : ev.getPartitionSet()) { + Map stateMap = ev.getStateMap(partition); + for (String node : stateMap.keySet()) { + if (newNodeReplicaCount.containsKey(node)) { + newNodeReplicaCount.computeIfPresent(node, (nodeName, replicaCount) -> replicaCount + 1); + } + } + } + } + Assert.assertTrue(newNodeReplicaCount.values().stream().allMatch(count -> count > 0)); + } finally { + for (MockParticipantManager p : newNodes) { + if (p != null && p.isConnected()) { + p.syncStop(); + } + } + } + } + + private void validate(int expectedReplica) { + ZkHelixClusterVerifier _clusterVerifier = + new StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR) + .setResources(_allDBs).build(); + Assert.assertTrue(_clusterVerifier.verifyByPolling()); + + for (String db : _allDBs) { + IdealState is = + _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); + ExternalView ev = + _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); + validateZoneAndTagIsolation(is, ev, expectedReplica); + } + } + + /** + * Validate instances for each partition is on different zone and with necessary tagged instances. + */ + private void validateZoneAndTagIsolation(IdealState is, ExternalView ev, int expectedReplica) { + String tag = is.getInstanceGroupTag(); + for (String partition : is.getPartitionSet()) { + Set assignedZones = new HashSet(); + + Map assignmentMap = ev.getRecord().getMapField(partition); + Set instancesInEV = assignmentMap.keySet(); + // TODO: preference List is not persisted in IS. + // Assert.assertEquals(instancesInEV, instancesInIs); + for (String instance : instancesInEV) { + assignedZones.add(_nodeToZoneMap.get(instance)); + if (tag != null) { + InstanceConfig config = + _gSetupTool.getClusterManagementTool().getInstanceConfig(CLUSTER_NAME, instance); + Assert.assertTrue(config.containsTag(tag)); + } + } + Assert.assertEquals(assignedZones.size(), expectedReplica); + } + } + + @AfterMethod + public void afterMethod() throws Exception { + for (String db : _allDBs) { + _gSetupTool.dropResourceFromCluster(CLUSTER_NAME, db); + } + _allDBs.clear(); + // waiting for all DB be dropped. + Thread.sleep(100); + ZkHelixClusterVerifier _clusterVerifier = + new StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR) + .setResources(_allDBs).build(); + Assert.assertTrue(_clusterVerifier.verifyByPolling()); + } + + @AfterClass + public void afterClass() throws Exception { + /* + * shutdown order: 1) disconnect the controller 2) disconnect participants + */ + if (_controller != null && _controller.isConnected()) { + _controller.syncStop(); + } + for (MockParticipantManager p : _participants) { + if (p != null && p.isConnected()) { + p.syncStop(); + } + } + deleteCluster(CLUSTER_NAME); + System.out.println("END " + CLASS_NAME + " at " + new Date(System.currentTimeMillis())); + } +} diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalanceTopologyAware.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalanceTopologyAware.java new file mode 100644 index 0000000000..412fc8c1ff --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalanceTopologyAware.java @@ -0,0 +1,114 @@ +package org.apache.helix.integration.rebalancer.WagedRebalancer; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.Date; + +import org.apache.helix.ConfigAccessor; +import org.apache.helix.integration.manager.ClusterControllerManager; +import org.apache.helix.integration.manager.MockParticipantManager; +import org.apache.helix.model.ClusterConfig; +import org.apache.helix.model.InstanceConfig; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +public class TestWagedRebalanceTopologyAware extends TestWagedRebalanceFaultZone { + private static final String TOLOPOGY_DEF = "/DOMAIN/ZONE/INSTANCE"; + private static final String DOMAIN_NAME = "Domain"; + private static final String FAULT_ZONE = "ZONE"; + + protected final String CLASS_NAME = getShortClassName(); + protected final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME; + + @BeforeClass + public void beforeClass() throws Exception { + System.out.println("START " + CLASS_NAME + " at " + new Date(System.currentTimeMillis())); + + _gSetupTool.addCluster(CLUSTER_NAME, true); + + ConfigAccessor configAccessor = new ConfigAccessor(_gZkClient); + ClusterConfig clusterConfig = configAccessor.getClusterConfig(CLUSTER_NAME); + clusterConfig.setTopology(TOLOPOGY_DEF); + clusterConfig.setFaultZoneType(FAULT_ZONE); + configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig); + + for (int i = 0; i < NUM_NODE; i++) { + String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i); + addInstanceConfig(storageNodeName, i, ZONES, TAGS); + } + + // start dummy participants + for (String node : _nodes) { + MockParticipantManager participant = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, node); + participant.syncStart(); + _participants.add(participant); + } + + // start controller + String controllerName = CONTROLLER_PREFIX + "_0"; + _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName); + _controller.syncStart(); + + enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true); + enableTopologyAwareRebalance(_gZkClient, CLUSTER_NAME, true); + } + + protected void addInstanceConfig(String storageNodeName, int seqNo, int zoneCount, int tagCount) { + _gSetupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName); + String zone = "zone-" + seqNo % zoneCount; + String tag = "tag-" + seqNo % tagCount; + + InstanceConfig config = + _gSetupTool.getClusterManagementTool().getInstanceConfig(CLUSTER_NAME, storageNodeName); + config.setDomain( + String.format("DOMAIN=%s,ZONE=%s,INSTANCE=%s", DOMAIN_NAME, zone, storageNodeName)); + config.addTag(tag); + _gSetupTool.getClusterManagementTool().setInstanceConfig(CLUSTER_NAME, storageNodeName, config); + + _nodeToZoneMap.put(storageNodeName, zone); + _nodeToTagMap.put(storageNodeName, tag); + _nodes.add(storageNodeName); + } + + @Test + public void testZoneIsolation() throws Exception { + super.testZoneIsolation(); + } + + @Test + public void testZoneIsolationWithInstanceTag() throws Exception { + super.testZoneIsolationWithInstanceTag(); + } + + @Test(dependsOnMethods = { "testZoneIsolation", "testZoneIsolationWithInstanceTag" }) + public void testLackEnoughLiveRacks() throws Exception { + super.testLackEnoughLiveRacks(); + } + + @Test(dependsOnMethods = { "testLackEnoughLiveRacks" }) + public void testLackEnoughRacks() throws Exception { + super.testLackEnoughRacks(); + } + + @Test(dependsOnMethods = { "testZoneIsolation", "testZoneIsolationWithInstanceTag" }) + public void testAddZone() throws Exception { + super.testAddZone(); + } +} From 40ca652c10ee3e4a79a23fa0ce69f55a459357d9 Mon Sep 17 00:00:00 2001 From: jiajunwang Date: Tue, 17 Sep 2019 16:19:51 -0700 Subject: [PATCH 2/5] Fix tests after merge the code. --- .../WagedRebalancer/TestWagedRebalance.java | 3 +- .../TestWagedRebalanceFaultZone.java | 77 ++++++------------- 2 files changed, 25 insertions(+), 55 deletions(-) diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java index 67de90a376..fb5375c2c7 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java +++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java @@ -64,7 +64,8 @@ public class TestWagedRebalance extends ZkTestBase { private Set _allDBs = new HashSet<>(); private int _replica = 3; - private static String[] _testModels = { BuiltInStateModelDefinitions.OnlineOffline.name(), + private static String[] _testModels = { + BuiltInStateModelDefinitions.OnlineOffline.name(), BuiltInStateModelDefinitions.MasterSlave.name(), BuiltInStateModelDefinitions.LeaderStandby.name() }; diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalanceFaultZone.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalanceFaultZone.java index c8d97d098c..0b020dbebc 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalanceFaultZone.java +++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalanceFaultZone.java @@ -27,11 +27,12 @@ import java.util.Map; import java.util.Set; -import org.apache.helix.TestHelper; +import org.apache.helix.ConfigAccessor; import org.apache.helix.common.ZkTestBase; import org.apache.helix.integration.manager.ClusterControllerManager; import org.apache.helix.integration.manager.MockParticipantManager; import org.apache.helix.model.BuiltInStateModelDefinitions; +import org.apache.helix.model.ClusterConfig; import org.apache.helix.model.ExternalView; import org.apache.helix.model.IdealState; import org.apache.helix.model.InstanceConfig; @@ -160,12 +161,7 @@ public void testLackEnoughLiveRacks() throws Exception { _allDBs.add(db); } Thread.sleep(300); - // Since the WAGED rebalancer does not do partial rebalance, the initial assignment won't show. - Assert.assertFalse(TestHelper.verify(() -> _allDBs.stream().allMatch(db -> { - ExternalView ev = - _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); - return ev != null && !ev.getPartitionSet().isEmpty(); - }), 2000)); + validate(2); // restart the participants within the zone for (int i = 0; i < _participants.size(); i++) { @@ -181,26 +177,6 @@ public void testLackEnoughLiveRacks() throws Exception { Thread.sleep(300); // Verify if the partitions get assigned validate(_replica); - - // TODO: Revisit the logic that the rebalancer handles fault zone. - // TODO: For now, if node is less than replica count, the rebalancer will adjust automatically. - // TODO: However, if the fault zone is less than replica count, calculation will fail. -/* for (int i = 0; i < _participants.size(); i++) { - MockParticipantManager p = _participants.get(i); - if (_nodeToZoneMap.get(p.getInstanceName()).equals(zone)) { - p.syncStop(); - } - } - - Thread.sleep(300); - - for (String db : _allDBs) { - IdealState is = - _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); - ExternalView ev = - _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); - validateZoneAndTagIsolation(is, ev, 2); - }*/ } @Test(dependsOnMethods = { "testLackEnoughLiveRacks" }) @@ -227,12 +203,7 @@ public void testLackEnoughRacks() throws Exception { _allDBs.add(db); } Thread.sleep(300); - // Since the WAGED rebalancer does not do partial rebalance, the initial assignment won't show. - Assert.assertFalse(TestHelper.verify(() -> _allDBs.stream().allMatch(db -> { - ExternalView ev = - _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); - return ev != null && !ev.getPartitionSet().isEmpty(); - }), 2000)); + validate(2); // Create new participants within the zone int nodeCount = _participants.size(); @@ -251,26 +222,6 @@ public void testLackEnoughRacks() throws Exception { Thread.sleep(300); // Verify if the partitions get assigned validate(_replica); - - // TODO: Revisit the logic that the rebalancer handles fault zone. - // TODO: For now, if node is less than replica count, the rebalancer will adjust automatically. - // TODO: However, if the fault zone is less than replica count, calculation will fail. -/* for (int i = 0; i < _participants.size(); i++) { - MockParticipantManager p = _participants.get(i); - if (_nodeToZoneMap.get(p.getInstanceName()).equals(zone)) { - p.syncStop(); - } - } - - Thread.sleep(300); - - for (String db : _allDBs) { - IdealState is = - _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); - ExternalView ev = - _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); - validateZoneAndTagIsolation(is, ev, 2); - }*/ } @Test(dependsOnMethods = { "testZoneIsolation", "testZoneIsolationWithInstanceTag" }) @@ -291,12 +242,22 @@ public void testAddZone() throws Exception { Set newNodes = new HashSet<>(); Map newNodeReplicaCount = new HashMap<>(); + ConfigAccessor configAccessor = new ConfigAccessor(_gZkClient); + try { + // Configure the preference so as to allow movements. + ClusterConfig clusterConfig = configAccessor.getClusterConfig(CLUSTER_NAME); + Map preference = new HashMap<>(); + preference.put(ClusterConfig.GlobalRebalancePreferenceKey.EVENNESS, 10); + preference.put(ClusterConfig.GlobalRebalancePreferenceKey.LESS_MOVEMENT, 0); + clusterConfig.setGlobalRebalancePreference(preference); + configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig); + int nodeCount = 2; for (int j = 0; j < nodeCount; j++) { String newNodeName = "new-zone-node-" + j + "_" + START_PORT; // Add all new node to the new zone - addInstanceConfig(newNodeName, -1, ZONES + 1, TAGS); + addInstanceConfig(newNodeName, j, ZONES + 1, TAGS); MockParticipantManager newNode = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, newNodeName); newNode.syncStart(); @@ -325,6 +286,14 @@ public void testAddZone() throws Exception { } Assert.assertTrue(newNodeReplicaCount.values().stream().allMatch(count -> count > 0)); } finally { + // Revert the preference + ClusterConfig clusterConfig = configAccessor.getClusterConfig(CLUSTER_NAME); + Map preference = new HashMap<>(); + preference.put(ClusterConfig.GlobalRebalancePreferenceKey.EVENNESS, 1); + preference.put(ClusterConfig.GlobalRebalancePreferenceKey.LESS_MOVEMENT, 1); + clusterConfig.setGlobalRebalancePreference(preference); + configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig); + // Stop the new nodes for (MockParticipantManager p : newNodes) { if (p != null && p.isConnected()) { p.syncStop(); From 00f4f93f7760fd350f7681c9b995b74cf1f27b9b Mon Sep 17 00:00:00 2001 From: jiajunwang Date: Tue, 17 Sep 2019 16:23:00 -0700 Subject: [PATCH 3/5] Minor format fix. --- .../helix/controller/stages/BestPossibleStateCalcStage.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java index 452631ca8c..8ce60e702f 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java @@ -274,8 +274,9 @@ private Map computeResourceBestPossibleStateWithWagedRebalance updateBestPossibleStateOutput(output, resource, is); } else { failureResources.add(resource.getResourceName()); - LogUtil.logWarn(logger, _eventId, - "Failed to calculate best possible states for " + resource.getResourceName()); + LogUtil.logWarn(logger, _eventId, String + .format("Failed to calculate best possible states for %s.", + resource.getResourceName())); } } return wagedRebalancedResourceMap; From 2dd9c0fccba3f14aace63fb2da0012d700c53b3b Mon Sep 17 00:00:00 2001 From: jiajunwang Date: Tue, 17 Sep 2019 16:43:33 -0700 Subject: [PATCH 4/5] Fix a typo in the replica ording. This logic will be tested in the following test PR. --- .../waged/constraints/ConstraintBasedAlgorithm.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithm.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithm.java index de6f23c106..1a41aef501 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithm.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithm.java @@ -166,8 +166,9 @@ private List getOrderedAssignableReplica(ClusterModel cluster return resourceName1.compareTo(resourceName2); } } else { - // Note we shall prioritize the replica with a higher state priority - return statePriority2 - statePriority1; + // Note we shall prioritize the replica with a higher state priority, + // the smaller priority number means higher priority. + return statePriority1 - statePriority2; } } else { // If the baseline assignment contains the assignment, prioritize the replica. From c705cee31fc8c81088c0eff7f1571c68de2b132d Mon Sep 17 00:00:00 2001 From: jiajunwang Date: Wed, 18 Sep 2019 13:40:31 -0700 Subject: [PATCH 5/5] Address comments. --- .../src/main/java/org/apache/helix/HelixRebalanceException.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/helix-core/src/main/java/org/apache/helix/HelixRebalanceException.java b/helix-core/src/main/java/org/apache/helix/HelixRebalanceException.java index b90a7d80cb..d54853f96c 100644 --- a/helix-core/src/main/java/org/apache/helix/HelixRebalanceException.java +++ b/helix-core/src/main/java/org/apache/helix/HelixRebalanceException.java @@ -23,6 +23,8 @@ * Exception thrown by Helix due to rebalance failures. */ public class HelixRebalanceException extends Exception { + // TODO: Adding static description or other necessary fields into the enum instances for + // TODO: supporting the rebalance monitor to understand the exception. public enum Type { INVALID_CLUSTER_STATUS, INVALID_REBALANCER_STATUS,