diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/FaultZoneAwareConstraint.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/FaultZoneAwareConstraint.java new file mode 100644 index 0000000000..c33419e38b --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/FaultZoneAwareConstraint.java @@ -0,0 +1,43 @@ +package org.apache.helix.controller.rebalancer.waged.constraints; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.helix.controller.rebalancer.waged.model.AssignableNode; +import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica; +import org.apache.helix.controller.rebalancer.waged.model.ClusterContext; + +class FaultZoneAwareConstraint extends HardConstraint { + + @Override + boolean isAssignmentValid(AssignableNode node, AssignableReplica replica, + ClusterContext clusterContext) { + if (!node.hasFaultZone()) { + return true; + } + return !clusterContext + .getPartitionsForResourceAndFaultZone(replica.getResourceName(), node.getFaultZone()) + .contains(replica.getPartitionName()); + } + + @Override + String getDescription() { + return "A fault zone cannot contain more than 1 replica of same partition"; + } +} diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/NodeCapacityConstraint.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/NodeCapacityConstraint.java new file mode 100644 index 0000000000..5fc2faffd5 --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/NodeCapacityConstraint.java @@ -0,0 +1,50 @@ +package org.apache.helix.controller.rebalancer.waged.constraints; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.Map; + +import org.apache.helix.controller.rebalancer.waged.model.AssignableNode; +import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica; +import org.apache.helix.controller.rebalancer.waged.model.ClusterContext; + +class NodeCapacityConstraint extends HardConstraint { + + @Override + boolean isAssignmentValid(AssignableNode node, AssignableReplica replica, + ClusterContext clusterContext) { + Map nodeCapacity = node.getCurrentCapacity(); + Map replicaCapacity = replica.getCapacity(); + + for (String key : replicaCapacity.keySet()) { + if (nodeCapacity.containsKey(key)) { + if (nodeCapacity.get(key) < replicaCapacity.get(key)) { + return false; + } + } + } + return true; + } + + @Override + String getDescription() { + return "Node has insufficient capacity"; + } +} diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/NodeMaxPartitionLimitConstraint.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/NodeMaxPartitionLimitConstraint.java new file mode 100644 index 0000000000..9d0752be6c --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/NodeMaxPartitionLimitConstraint.java @@ -0,0 +1,40 @@ +package org.apache.helix.controller.rebalancer.waged.constraints; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.helix.controller.rebalancer.waged.model.AssignableNode; +import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica; +import org.apache.helix.controller.rebalancer.waged.model.ClusterContext; + +class NodeMaxPartitionLimitConstraint extends HardConstraint { + + @Override + boolean isAssignmentValid(AssignableNode node, AssignableReplica replica, + ClusterContext clusterContext) { + return node.getAssignedReplicaCount() < node.getMaxPartition() + && node.getAssignedPartitionsByResource(replica.getResourceName()).size() < replica + .getResourceMaxPartitionsPerInstance(); + } + + @Override + String getDescription() { + return "Cannot exceed the maximum number of partitions limitation on node"; + } +} diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ReplicaActivateConstraint.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ReplicaActivateConstraint.java new file mode 100644 index 0000000000..9152efef85 --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ReplicaActivateConstraint.java @@ -0,0 +1,41 @@ +package org.apache.helix.controller.rebalancer.waged.constraints; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.List; + +import org.apache.helix.controller.rebalancer.waged.model.AssignableNode; +import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica; +import org.apache.helix.controller.rebalancer.waged.model.ClusterContext; + +class ReplicaActivateConstraint extends HardConstraint { + @Override + boolean isAssignmentValid(AssignableNode node, AssignableReplica replica, + ClusterContext clusterContext) { + List disabledPartitions = + node.getDisabledPartitionsMap().get(replica.getResourceName()); + return disabledPartitions == null || !disabledPartitions.contains(replica.getPartitionName()); + } + + @Override + String getDescription() { + return "Cannot assign the inactive replica"; + } +} diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/SamePartitionOnInstanceConstraint.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/SamePartitionOnInstanceConstraint.java new file mode 100644 index 0000000000..202e49af24 --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/SamePartitionOnInstanceConstraint.java @@ -0,0 +1,39 @@ +package org.apache.helix.controller.rebalancer.waged.constraints; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.helix.controller.rebalancer.waged.model.AssignableNode; +import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica; +import org.apache.helix.controller.rebalancer.waged.model.ClusterContext; + +class SamePartitionOnInstanceConstraint extends HardConstraint { + + @Override + boolean isAssignmentValid(AssignableNode node, AssignableReplica replica, + ClusterContext clusterContext) { + return !node.getAssignedPartitionsByResource(replica.getResourceName()) + .contains(replica.getPartitionName()); + } + + @Override + String getDescription() { + return "Same partition of different states cannot co-exist in one instance"; + } +} diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ValidGroupTagConstraint.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ValidGroupTagConstraint.java new file mode 100644 index 0000000000..e31864fde1 --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ValidGroupTagConstraint.java @@ -0,0 +1,41 @@ +package org.apache.helix.controller.rebalancer.waged.constraints; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.helix.controller.rebalancer.waged.model.AssignableNode; +import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica; +import org.apache.helix.controller.rebalancer.waged.model.ClusterContext; + +class ValidGroupTagConstraint extends HardConstraint { + @Override + boolean isAssignmentValid(AssignableNode node, AssignableReplica replica, + ClusterContext clusterContext) { + if (!replica.hasResourceInstanceGroupTag()) { + return true; + } + + return node.getInstanceTags().contains(replica.getResourceInstanceGroupTag()); + } + + @Override + String getDescription() { + return "Instance doesn't have the tag of the replica"; + } +} diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java index 4141d20d31..f25c2894fb 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java @@ -19,6 +19,8 @@ * under the License. */ +import static java.lang.Math.max; + import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -35,8 +37,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static java.lang.Math.max; - /** * This class represents a possible allocation of the replication. * Note that any usage updates to the AssignableNode are not thread safe. @@ -52,7 +52,8 @@ public class AssignableNode implements Comparable { private Map _maxCapacity; private int _maxPartition; // maximum number of the partitions that can be assigned to the node. - // A map of > that tracks the replicas assigned to the node. + // A map of > that tracks the replicas assigned to the + // node. private Map> _currentAssignedReplicaMap; // A map of that tracks the current available node capacity private Map _currentCapacityMap; @@ -78,13 +79,15 @@ private void reset() { } /** - * Update the node with a ClusterDataCache. This resets the current assignment and recalculates currentCapacity. - * NOTE: While this is required to be used in the constructor, this can also be used when the clusterCache needs to be - * refreshed. This is under the assumption that the capacity mappings of InstanceConfig and ResourceConfig could + * Update the node with a ClusterDataCache. This resets the current assignment and recalculates + * currentCapacity. + * NOTE: While this is required to be used in the constructor, this can also be used when the + * clusterCache needs to be + * refreshed. This is under the assumption that the capacity mappings of InstanceConfig and + * ResourceConfig could * subject to change. If the assumption is no longer true, this function should become private. - * - * @param clusterConfig - the Cluster Config of the cluster where the node is located - * @param instanceConfig - the Instance Config of the node + * @param clusterConfig - the Cluster Config of the cluster where the node is located + * @param instanceConfig - the Instance Config of the node * @param existingAssignment - all the existing replicas that are current assigned to the node */ private void refresh(ClusterConfig clusterConfig, InstanceConfig instanceConfig, @@ -104,7 +107,6 @@ private void refresh(ClusterConfig clusterConfig, InstanceConfig instanceConfig, /** * Assign a replica to the node. - * * @param assignableReplica - the replica to be assigned */ void assign(AssignableReplica assignableReplica) { @@ -116,7 +118,6 @@ void assign(AssignableReplica assignableReplica) { /** * Release a replica from the node. * If the replication is not on this node, the assignable node is not updated. - * * @param replica - the replica to be released */ void release(AssignableReplica replica) throws IllegalArgumentException { @@ -131,8 +132,8 @@ void release(AssignableReplica replica) throws IllegalArgumentException { } Map partitionMap = _currentAssignedReplicaMap.get(resourceName); - if (!partitionMap.containsKey(partitionName) || !partitionMap.get(partitionName) - .equals(replica)) { + if (!partitionMap.containsKey(partitionName) + || !partitionMap.get(partitionName).equals(replica)) { LOG.warn("Replica {} is not assigned to node {}. Ignore the release call.", replica.toString(), getInstanceName()); return; @@ -174,7 +175,8 @@ public Set getAssignedPartitionsByResource(String resource) { /** * @param resource Resource name - * @return A set of the current assigned replicas' partition names with the top state in the specified resource. + * @return A set of the current assigned replicas' partition names with the top state in the + * specified resource. */ public Set getAssignedTopStatePartitionsByResource(String resource) { return _currentAssignedReplicaMap.getOrDefault(resource, Collections.emptyMap()).entrySet() @@ -194,7 +196,7 @@ public long getAssignedTopStatePartitionsCount() { /** * @return The total count of assigned replicas. */ - public long getAssignedReplicaCount() { + public int getAssignedReplicaCount() { return _currentAssignedReplicaMap.values().stream().mapToInt(Map::size).sum(); } @@ -207,7 +209,8 @@ public Map getCurrentCapacity() { /** * Return the most concerning capacity utilization number for evenly partition assignment. - * The method dynamically returns the highest utilization number among all the capacity categories. + * The method dynamically returns the highest utilization number among all the capacity + * categories. * For example, if the current node usage is {CPU: 0.9, MEM: 0.4, DISK: 0.6}. Then this call shall * return 0.9. * @@ -229,15 +232,21 @@ public String getFaultZone() { return _faultZone; } + public boolean hasFaultZone() { + return _faultZone != null; + } + /** - * @return A map of contains all the partitions that are disabled on the node. + * @return A map of contains all the partitions that are + * disabled on the node. */ public Map> getDisabledPartitionsMap() { return _disabledPartitionsMap; } /** - * @return A map of that describes the max capacity of the node. + * @return A map of that describes the max capacity of the + * node. */ public Map getMaxCapacity() { return _maxCapacity; @@ -251,8 +260,10 @@ public int getMaxPartition() { } /** - * Computes the fault zone id based on the domain and fault zone type when topology is enabled. For example, when - * the domain is "zone=2, instance=testInstance" and the fault zone type is "zone", this function returns "2". + * Computes the fault zone id based on the domain and fault zone type when topology is enabled. + * For example, when + * the domain is "zone=2, instance=testInstance" and the fault zone type is "zone", this function + * returns "2". * If cannot find the fault zone id, this function leaves the fault zone id as the instance name. * TODO merge this logic with Topology.java tree building logic. * For now, the WAGED rebalancer has a more strict topology def requirement. @@ -267,8 +278,8 @@ private String computeFaultZone(ClusterConfig clusterConfig, InstanceConfig inst } String[] topologyDef = topologyStr.trim().split("/"); - if (topologyDef.length == 0 || Arrays.stream(topologyDef) - .noneMatch(type -> type.equals(faultZoneType))) { + if (topologyDef.length == 0 + || Arrays.stream(topologyDef).noneMatch(type -> type.equals(faultZoneType))) { throw new HelixException( "The configured topology definition is empty or does not contain the fault zone type."); } @@ -304,7 +315,8 @@ private String computeFaultZone(ClusterConfig clusterConfig, InstanceConfig inst } /** - * This function should only be used to assign a set of new partitions that are not allocated on this node. + * This function should only be used to assign a set of new partitions that are not allocated on + * this node. * Using this function avoids the overhead of updating capacity repeatedly. */ private void assignNewBatch(Collection replicas) { @@ -314,9 +326,8 @@ private void assignNewBatch(Collection replicas) { // increment the capacity requirement according to partition's capacity configuration. for (Map.Entry capacity : replica.getCapacity().entrySet()) { totalPartitionCapacity.compute(capacity.getKey(), - (key, totalValue) -> (totalValue == null) ? - capacity.getValue() : - totalValue + capacity.getValue()); + (key, totalValue) -> (totalValue == null) ? capacity.getValue() + : totalValue + capacity.getValue()); } } @@ -332,12 +343,12 @@ private void assignNewBatch(Collection replicas) { private void addToAssignmentRecord(AssignableReplica replica) { String resourceName = replica.getResourceName(); String partitionName = replica.getPartitionName(); - if (_currentAssignedReplicaMap.containsKey(resourceName) && _currentAssignedReplicaMap - .get(resourceName).containsKey(partitionName)) { - throw new HelixException(String - .format("Resource %s already has a replica with state %s from partition %s on node %s", - replica.getResourceName(), replica.getReplicaState(), replica.getPartitionName(), - getInstanceName())); + if (_currentAssignedReplicaMap.containsKey(resourceName) + && _currentAssignedReplicaMap.get(resourceName).containsKey(partitionName)) { + throw new HelixException(String.format( + "Resource %s already has a replica with state %s from partition %s on node %s", + replica.getResourceName(), replica.getReplicaState(), replica.getPartitionName(), + getInstanceName())); } else { _currentAssignedReplicaMap.computeIfAbsent(resourceName, key -> new HashMap<>()) .put(partitionName, replica); diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableReplica.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableReplica.java index 537bf70477..66bd7b774e 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableReplica.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableReplica.java @@ -87,6 +87,10 @@ public String getResourceInstanceGroupTag() { return _resourceInstanceGroupTag; } + public boolean hasResourceInstanceGroupTag() { + return _resourceInstanceGroupTag != null && !_resourceInstanceGroupTag.isEmpty(); + } + public int getResourceMaxPartitionsPerInstance() { return _resourceMaxPartitionsPerInstance; } diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestFaultZoneAwareConstraint.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestFaultZoneAwareConstraint.java new file mode 100644 index 0000000000..9d2cb14e70 --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestFaultZoneAwareConstraint.java @@ -0,0 +1,79 @@ +package org.apache.helix.controller.rebalancer.waged.constraints; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import static org.mockito.Mockito.when; + +import java.util.Collections; + +import org.apache.helix.controller.rebalancer.waged.model.AssignableNode; +import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica; +import org.apache.helix.controller.rebalancer.waged.model.ClusterContext; +import org.mockito.Mockito; +import org.testng.Assert; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import com.google.common.collect.ImmutableSet; + +public class TestFaultZoneAwareConstraint { + private static final String TEST_PARTITION = "testPartition"; + private static final String TEST_ZONE = "testZone"; + private static final String TEST_RESOURCE = "testResource"; + private final AssignableReplica _testReplica = Mockito.mock(AssignableReplica.class); + private final AssignableNode _testNode = Mockito.mock(AssignableNode.class); + private final ClusterContext _clusterContext = Mockito.mock(ClusterContext.class); + + private final HardConstraint _faultZoneAwareConstraint = new FaultZoneAwareConstraint(); + + @BeforeMethod + public void init() { + when(_testReplica.getResourceName()).thenReturn(TEST_RESOURCE); + when(_testReplica.getPartitionName()).thenReturn(TEST_PARTITION); + when(_testNode.getFaultZone()).thenReturn(TEST_ZONE); + } + + @Test + public void inValidWhenFaultZoneAlreadyAssigned() { + when(_testNode.hasFaultZone()).thenReturn(true); + when(_clusterContext.getPartitionsForResourceAndFaultZone(TEST_RESOURCE, TEST_ZONE)).thenReturn( + ImmutableSet.of(TEST_PARTITION)); + + Assert.assertFalse( + _faultZoneAwareConstraint.isAssignmentValid(_testNode, _testReplica, _clusterContext)); + } + + @Test + public void validWhenEmptyAssignment() { + when(_testNode.hasFaultZone()).thenReturn(true); + when(_clusterContext.getPartitionsForResourceAndFaultZone(TEST_RESOURCE, TEST_ZONE)).thenReturn(Collections.emptySet()); + + Assert.assertTrue( + _faultZoneAwareConstraint.isAssignmentValid(_testNode, _testReplica, _clusterContext)); + } + + @Test + public void validWhenNoFaultZone() { + when(_testNode.hasFaultZone()).thenReturn(false); + + Assert.assertTrue( + _faultZoneAwareConstraint.isAssignmentValid(_testNode, _testReplica, _clusterContext)); + } +} diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestNodeCapacityConstraint.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestNodeCapacityConstraint.java new file mode 100644 index 0000000000..511f881b3f --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestNodeCapacityConstraint.java @@ -0,0 +1,54 @@ +package org.apache.helix.controller.rebalancer.waged.constraints; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import static org.mockito.Mockito.when; + +import org.apache.helix.controller.rebalancer.waged.model.AssignableNode; +import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica; +import org.apache.helix.controller.rebalancer.waged.model.ClusterContext; +import org.mockito.Mockito; +import org.testng.Assert; +import org.testng.annotations.Test; + +import com.google.common.collect.ImmutableMap; + +public class TestNodeCapacityConstraint { + private final AssignableReplica _testReplica = Mockito.mock(AssignableReplica.class); + private final AssignableNode _testNode = Mockito.mock(AssignableNode.class); + private final ClusterContext _clusterContext = Mockito.mock(ClusterContext.class); + private final HardConstraint _constraint = new NodeCapacityConstraint(); + + @Test + public void testConstraintValidWhenNodeHasEnoughSpace() { + String key = "testKey"; + when(_testNode.getCurrentCapacity()).thenReturn(ImmutableMap.of(key, 10)); + when(_testReplica.getCapacity()).thenReturn(ImmutableMap.of(key, 5)); + Assert.assertTrue(_constraint.isAssignmentValid(_testNode, _testReplica, _clusterContext)); + } + + @Test + public void testConstraintInValidWhenNodeHasInsufficientSpace() { + String key = "testKey"; + when(_testNode.getCurrentCapacity()).thenReturn(ImmutableMap.of(key, 1)); + when(_testReplica.getCapacity()).thenReturn(ImmutableMap.of(key, 5)); + Assert.assertFalse(_constraint.isAssignmentValid(_testNode, _testReplica, _clusterContext)); + } +} diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestNodeMaxPartitionLimitConstraint.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestNodeMaxPartitionLimitConstraint.java new file mode 100644 index 0000000000..4cb7466283 --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestNodeMaxPartitionLimitConstraint.java @@ -0,0 +1,56 @@ +package org.apache.helix.controller.rebalancer.waged.constraints; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import static org.mockito.Mockito.when; + +import java.util.Collections; + +import org.apache.helix.controller.rebalancer.waged.model.AssignableNode; +import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica; +import org.apache.helix.controller.rebalancer.waged.model.ClusterContext; +import org.mockito.Mockito; +import org.testng.Assert; +import org.testng.annotations.Test; + +public class TestNodeMaxPartitionLimitConstraint { + private static final String TEST_RESOURCE = "TestResource"; + private final AssignableReplica _testReplica = Mockito.mock(AssignableReplica.class); + private final AssignableNode _testNode = Mockito.mock(AssignableNode.class); + private final ClusterContext _clusterContext = Mockito.mock(ClusterContext.class); + private final HardConstraint _constraint = new NodeMaxPartitionLimitConstraint(); + + @Test + public void testConstraintValid() { + when(_testNode.getAssignedReplicaCount()).thenReturn(0); + when(_testNode.getMaxPartition()).thenReturn(10); + when(_testNode.getAssignedPartitionsByResource(TEST_RESOURCE)) + .thenReturn(Collections.emptySet()); + when(_testReplica.getResourceMaxPartitionsPerInstance()).thenReturn(5); + Assert.assertTrue(_constraint.isAssignmentValid(_testNode, _testReplica, _clusterContext)); + } + + @Test + public void testConstraintInvalid() { + when(_testNode.getAssignedReplicaCount()).thenReturn(10); + when(_testNode.getMaxPartition()).thenReturn(5); + Assert.assertFalse(_constraint.isAssignmentValid(_testNode, _testReplica, _clusterContext)); + } +} diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestPartitionActivateConstraint.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestPartitionActivateConstraint.java new file mode 100644 index 0000000000..ecfdaa2029 --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestPartitionActivateConstraint.java @@ -0,0 +1,64 @@ +package org.apache.helix.controller.rebalancer.waged.constraints; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import static org.mockito.Mockito.when; + +import java.util.Collections; + +import org.apache.helix.controller.rebalancer.waged.model.AssignableNode; +import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica; +import org.apache.helix.controller.rebalancer.waged.model.ClusterContext; +import org.mockito.Mockito; +import org.testng.Assert; +import org.testng.annotations.Test; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; + +public class TestPartitionActivateConstraint { + private static final String TEST_PARTITION = "TestPartition"; + private static final String TEST_RESOURCE = "TestResource"; + private final AssignableReplica _testReplica = Mockito.mock(AssignableReplica.class); + private final AssignableNode _testNode = Mockito.mock(AssignableNode.class); + private final ClusterContext _clusterContext = Mockito.mock(ClusterContext.class); + private final HardConstraint _constraint = new ReplicaActivateConstraint(); + + @Test + public void testConstraintValid() { + when(_testReplica.getResourceName()).thenReturn(TEST_RESOURCE); + when(_testReplica.getPartitionName()).thenReturn(TEST_PARTITION); + when(_testNode.getDisabledPartitionsMap()) + .thenReturn(ImmutableMap.of(TEST_PARTITION, Collections.emptyList())); + Assert.assertTrue(_constraint.isAssignmentValid(_testNode, _testReplica, _clusterContext)); + when(_testNode.getDisabledPartitionsMap()) + .thenReturn(ImmutableMap.of(TEST_PARTITION, ImmutableList.of("dummy"))); + Assert.assertTrue(_constraint.isAssignmentValid(_testNode, _testReplica, _clusterContext)); + } + + @Test + public void testConstraintInvalidWhenReplicaIsDisabled() { + when(_testReplica.getResourceName()).thenReturn(TEST_RESOURCE); + when(_testReplica.getPartitionName()).thenReturn(TEST_PARTITION); + when(_testNode.getDisabledPartitionsMap()) + .thenReturn(ImmutableMap.of(TEST_PARTITION, ImmutableList.of(TEST_PARTITION))); + Assert.assertTrue(_constraint.isAssignmentValid(_testNode, _testReplica, _clusterContext)); + } +} diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestSamePartitionOnInstanceConstraint.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestSamePartitionOnInstanceConstraint.java new file mode 100644 index 0000000000..50b0c037be --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestSamePartitionOnInstanceConstraint.java @@ -0,0 +1,59 @@ +package org.apache.helix.controller.rebalancer.waged.constraints; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import static org.mockito.Mockito.when; + +import org.apache.helix.controller.rebalancer.waged.model.AssignableNode; +import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica; +import org.apache.helix.controller.rebalancer.waged.model.ClusterContext; +import org.mockito.Mockito; +import org.testng.Assert; +import org.testng.annotations.Test; + +import com.google.common.collect.ImmutableSet; + +public class TestSamePartitionOnInstanceConstraint { + private static final String TEST_RESOURCE = "TestResource"; + private static final String TEST_PARTITIOIN = TEST_RESOURCE + "0"; + private final AssignableReplica _testReplica = Mockito.mock(AssignableReplica.class); + private final AssignableNode _testNode = Mockito.mock(AssignableNode.class); + private final ClusterContext _clusterContext = Mockito.mock(ClusterContext.class); + private final HardConstraint _constraint = new SamePartitionOnInstanceConstraint(); + + @Test + public void testConstraintValid() { + when(_testNode.getAssignedPartitionsByResource(TEST_RESOURCE)) + .thenReturn(ImmutableSet.of("dummy")); + when(_testReplica.getResourceName()).thenReturn(TEST_RESOURCE); + when(_testReplica.getPartitionName()).thenReturn(TEST_PARTITIOIN); + + Assert.assertTrue(_constraint.isAssignmentValid(_testNode, _testReplica, _clusterContext)); + } + + @Test + public void testConstraintInValid() { + when(_testNode.getAssignedPartitionsByResource(TEST_RESOURCE)) + .thenReturn(ImmutableSet.of(TEST_PARTITIOIN)); + when(_testReplica.getResourceName()).thenReturn(TEST_RESOURCE); + when(_testReplica.getPartitionName()).thenReturn(TEST_PARTITIOIN); + Assert.assertFalse(_constraint.isAssignmentValid(_testNode, _testReplica, _clusterContext)); + } +} diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestValidGroupTagConstraint.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestValidGroupTagConstraint.java new file mode 100644 index 0000000000..8d02b3d801 --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestValidGroupTagConstraint.java @@ -0,0 +1,66 @@ +package org.apache.helix.controller.rebalancer.waged.constraints; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import static org.mockito.Mockito.when; + +import java.util.Collections; + +import org.apache.helix.controller.rebalancer.waged.model.AssignableNode; +import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica; +import org.apache.helix.controller.rebalancer.waged.model.ClusterContext; +import org.mockito.Mockito; +import org.testng.Assert; +import org.testng.annotations.Test; + +import com.google.common.collect.ImmutableSet; + +public class TestValidGroupTagConstraint { + private static final String TEST_TAG = "testTag"; + private final AssignableReplica _testReplica = Mockito.mock(AssignableReplica.class); + private final AssignableNode _testNode = Mockito.mock(AssignableNode.class); + private final ClusterContext _clusterContext = Mockito.mock(ClusterContext.class); + private final HardConstraint _constraint = new ValidGroupTagConstraint(); + + @Test + public void testConstraintValid() { + when(_testReplica.hasResourceInstanceGroupTag()).thenReturn(true); + when(_testReplica.getResourceInstanceGroupTag()).thenReturn(TEST_TAG); + when(_testNode.getInstanceTags()).thenReturn(ImmutableSet.of(TEST_TAG)); + + Assert.assertTrue(_constraint.isAssignmentValid(_testNode, _testReplica, _clusterContext)); + } + + @Test + public void testConstraintInValid() { + when(_testReplica.hasResourceInstanceGroupTag()).thenReturn(true); + when(_testReplica.getResourceInstanceGroupTag()).thenReturn(TEST_TAG); + when(_testNode.getInstanceTags()).thenReturn(Collections.emptySet()); + + Assert.assertFalse(_constraint.isAssignmentValid(_testNode, _testReplica, _clusterContext)); + } + + @Test + public void testConstraintWhenReplicaHasNoTag() { + when(_testReplica.hasResourceInstanceGroupTag()).thenReturn(false); + + Assert.assertTrue(_constraint.isAssignmentValid(_testNode, _testReplica, _clusterContext)); + } +}