Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,10 @@ public class AssignableNode implements Comparable<AssignableNode> {
* @param clusterConfig
* @param instanceConfig
* @param instanceName
* @param existingAssignment A collection of replicas that have been pre-allocated to the node.
*/
AssignableNode(ClusterConfig clusterConfig, InstanceConfig instanceConfig, String instanceName,
Collection<AssignableReplica> existingAssignment) {
AssignableNode(ClusterConfig clusterConfig, InstanceConfig instanceConfig, String instanceName) {
_instanceName = instanceName;
refresh(clusterConfig, instanceConfig, existingAssignment);
refresh(clusterConfig, instanceConfig);
}

private void reset() {
Expand All @@ -88,10 +86,8 @@ private void reset() {
* subject to change. If the assumption is no longer true, this function should become private.
* @param clusterConfig - the Cluster Config of the cluster where the node is located
* @param instanceConfig - the Instance Config of the node
* @param existingAssignment - all the existing replicas that are current assigned to the node
*/
private void refresh(ClusterConfig clusterConfig, InstanceConfig instanceConfig,
Collection<AssignableReplica> existingAssignment) {
private void refresh(ClusterConfig clusterConfig, InstanceConfig instanceConfig) {
reset();

Map<String, Integer> instanceCapacity = fetchInstanceCapacity(clusterConfig, instanceConfig);
Expand All @@ -101,8 +97,29 @@ private void refresh(ClusterConfig clusterConfig, InstanceConfig instanceConfig,
_disabledPartitionsMap = instanceConfig.getDisabledPartitionsMap();
_maxCapacity = instanceCapacity;
_maxPartition = clusterConfig.getMaxPartitionsPerInstance();
}

/**
* This function should only be used to assign a set of new partitions that are not allocated on
* this node.
* Using this function avoids the overhead of updating capacity repeatedly.
*/
void assignNewBatch(Collection<AssignableReplica> replicas) {
Map<String, Integer> totalPartitionCapacity = new HashMap<>();
for (AssignableReplica replica : replicas) {
addToAssignmentRecord(replica);
// increment the capacity requirement according to partition's capacity configuration.
for (Map.Entry<String, Integer> capacity : replica.getCapacity().entrySet()) {
totalPartitionCapacity.compute(capacity.getKey(),
(key, totalValue) -> (totalValue == null) ? capacity.getValue()
: totalValue + capacity.getValue());
}
}

assignNewBatch(existingAssignment);
// Update the global state after all single replications' calculation is done.
for (String key : totalPartitionCapacity.keySet()) {
updateCapacityAndUtilization(key, totalPartitionCapacity.get(key));
}
}

/**
Expand Down Expand Up @@ -314,29 +331,6 @@ private String computeFaultZone(ClusterConfig clusterConfig, InstanceConfig inst
}
}

/**
* This function should only be used to assign a set of new partitions that are not allocated on
* this node.
* Using this function avoids the overhead of updating capacity repeatedly.
*/
private void assignNewBatch(Collection<AssignableReplica> replicas) {
Map<String, Integer> totalPartitionCapacity = new HashMap<>();
for (AssignableReplica replica : replicas) {
addToAssignmentRecord(replica);
// increment the capacity requirement according to partition's capacity configuration.
for (Map.Entry<String, Integer> capacity : replica.getCapacity().entrySet()) {
totalPartitionCapacity.compute(capacity.getKey(),
(key, totalValue) -> (totalValue == null) ? capacity.getValue()
: totalValue + capacity.getValue());
}
}

// Update the global state after all single replications' calculation is done.
for (String key : totalPartitionCapacity.keySet()) {
updateCapacityAndUtilization(key, totalPartitionCapacity.get(key));
}
}

/**
* @throws HelixException if the replica has already been assigned to the node.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,15 @@ public static ClusterModel generateClusterModel(ResourceControllerDataProvider d
Map<HelixConstants.ChangeType, Set<String>> clusterChanges,
Map<String, ResourceAssignment> baselineAssignment,
Map<String, ResourceAssignment> bestPossibleAssignment) {
// Construct all the assignable nodes and initialize with the allocated replicas.
Set<AssignableNode> assignableNodes =
parseAllNodes(dataProvider.getClusterConfig(), dataProvider.getInstanceConfigMap(),
activeInstances);

// Generate replica objects for all the resource partitions.
// <resource, replica set>
Map<String, Set<AssignableReplica>> replicaMap =
parseAllReplicas(dataProvider, resourceMap, activeInstances.size());
parseAllReplicas(dataProvider, resourceMap, assignableNodes);

// Check if the replicas need to be reassigned.
Map<String, Set<AssignableReplica>> allocatedReplicas =
Expand All @@ -74,10 +79,9 @@ public static ClusterModel generateClusterModel(ResourceControllerDataProvider d
findToBeAssignedReplicas(replicaMap, clusterChanges, activeInstances,
bestPossibleAssignment, allocatedReplicas);

// Construct all the assignable nodes and initialize with the allocated replicas.
Set<AssignableNode> assignableNodes =
parseAllNodes(dataProvider.getClusterConfig(), dataProvider.getInstanceConfigMap(),
activeInstances, allocatedReplicas);
// Update the allocated replicas to the assignable nodes.
assignableNodes.stream().forEach(node -> node.assignNewBatch(
allocatedReplicas.getOrDefault(node.getInstanceName(), Collections.emptySet())));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can we simplify just using get method, it's because all information is in the same method and you should know if exists or not.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not necessary. If the node is newly added, there won't an existing assignment. And allocatedReplicas.get(node.getInstanceName()) will be null.


// Construct and initialize cluster context.
ClusterContext context = new ClusterContext(
Expand Down Expand Up @@ -171,15 +175,13 @@ private static Set<AssignableReplica> findToBeAssignedReplicas(
* @param clusterConfig The cluster configuration.
* @param instanceConfigMap A map of all the instance configuration.
* @param activeInstances All the instances that are online and enabled.
* @param allocatedReplicas A map of all the assigned replicas, which will not be reassigned during the rebalance.
* @return A map of assignable node set, <InstanceName, node set>.
*/
private static Set<AssignableNode> parseAllNodes(ClusterConfig clusterConfig,
Map<String, InstanceConfig> instanceConfigMap, Set<String> activeInstances,
Map<String, Set<AssignableReplica>> allocatedReplicas) {
Map<String, InstanceConfig> instanceConfigMap, Set<String> activeInstances) {
return activeInstances.stream().map(
instanceName -> new AssignableNode(clusterConfig, instanceConfigMap.get(instanceName),
instanceName, allocatedReplicas.getOrDefault(instanceName, Collections.emptySet())))
instanceName))
.collect(Collectors.toSet());
}

Expand All @@ -188,11 +190,12 @@ private static Set<AssignableNode> parseAllNodes(ClusterConfig clusterConfig,
*
* @param dataProvider The cluster status cache that contains the current cluster status.
* @param resourceMap All the valid resources that are managed by the rebalancer.
* @param assignableNodes All the active assignable nodes.
* @return A map of assignable replica set, <ResourceName, replica set>.
*/
private static Map<String, Set<AssignableReplica>> parseAllReplicas(
ResourceControllerDataProvider dataProvider, Map<String, Resource> resourceMap,
int instanceCount) {
Set<AssignableNode> assignableNodes) {
Map<String, Set<AssignableReplica>> totalReplicaMap = new HashMap<>();
ClusterConfig clusterConfig = dataProvider.getClusterConfig();

Expand All @@ -211,8 +214,11 @@ private static Map<String, Set<AssignableReplica>> parseAllReplicas(
is.getStateModelDefRef(), resourceName));
}

int activeFaultZoneCount =
assignableNodes.stream().map(node -> node.getFaultZone()).collect(Collectors.toSet())
.size();
Map<String, Integer> stateCountMap =
def.getStateCountMap(instanceCount, is.getReplicaCount(instanceCount));
def.getStateCountMap(activeFaultZoneCount, is.getReplicaCount(assignableNodes.size()));

for (String partition : is.getPartitionSet()) {
for (Map.Entry<String, Integer> entry : stateCountMap.entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,7 @@ private Set<AssignableNode> generateNodes(ResourceControllerDataProvider testCac
Set<AssignableNode> nodeSet = new HashSet<>();
testCache.getInstanceConfigMap().values().stream()
.forEach(config -> nodeSet.add(new AssignableNode(testCache.getClusterConfig(),
testCache.getInstanceConfigMap().get(_testInstanceId), config.getInstanceName(),
Collections.emptyList())));
testCache.getInstanceConfigMap().get(_testInstanceId), config.getInstanceName())));
return nodeSet;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ public void testNormalUsage() throws IOException {
expectedCapacityMap.put("item3", 30);

AssignableNode assignableNode = new AssignableNode(testCache.getClusterConfig(),
testCache.getInstanceConfigMap().get(_testInstanceId), _testInstanceId, assignmentSet);
testCache.getInstanceConfigMap().get(_testInstanceId), _testInstanceId);
assignableNode.assignNewBatch(assignmentSet);
Assert.assertEquals(assignableNode.getAssignedPartitionsMap(), expectedAssignment);
Assert.assertEquals(assignableNode.getAssignedReplicaCount(), 4);
Assert.assertEquals(assignableNode.getHighestCapacityUtilization(), 16.0 / 20.0, 0.005);
Expand Down Expand Up @@ -167,8 +168,7 @@ public void testReleaseNoPartition() throws IOException {
ResourceControllerDataProvider testCache = setupClusterDataCache();

AssignableNode assignableNode = new AssignableNode(testCache.getClusterConfig(),
testCache.getInstanceConfigMap().get(_testInstanceId), _testInstanceId,
Collections.emptyList());
testCache.getInstanceConfigMap().get(_testInstanceId), _testInstanceId);
AssignableReplica removingReplica = new AssignableReplica(testCache.getClusterConfig(),
testCache.getResourceConfig(_resourceNames.get(1)), _partitionNames.get(2) + "non-exist",
"MASTER", 1);
Expand All @@ -183,7 +183,8 @@ public void testAssignDuplicateReplica() throws IOException {
Set<AssignableReplica> assignmentSet = generateReplicas(testCache);

AssignableNode assignableNode = new AssignableNode(testCache.getClusterConfig(),
testCache.getInstanceConfigMap().get(_testInstanceId), _testInstanceId, assignmentSet);
testCache.getInstanceConfigMap().get(_testInstanceId), _testInstanceId);
assignableNode.assignNewBatch(assignmentSet);
AssignableReplica duplicateReplica = new AssignableReplica(testCache.getClusterConfig(),
testCache.getResourceConfig(_resourceNames.get(0)), _partitionNames.get(0), "SLAVE", 2);
assignableNode.assign(duplicateReplica);
Expand All @@ -206,8 +207,7 @@ public void testParseFaultZoneNotFound() throws IOException {
when(testCache.getInstanceConfigMap()).thenReturn(instanceConfigMap);

new AssignableNode(testCache.getClusterConfig(),
testCache.getInstanceConfigMap().get(_testInstanceId), _testInstanceId,
Collections.emptyList());
testCache.getInstanceConfigMap().get(_testInstanceId), _testInstanceId);
}

@Test
Expand All @@ -227,8 +227,7 @@ public void testParseFaultZone() throws IOException {
when(testCache.getInstanceConfigMap()).thenReturn(instanceConfigMap);

AssignableNode assignableNode = new AssignableNode(testCache.getClusterConfig(),
testCache.getInstanceConfigMap().get(_testInstanceId), _testInstanceId,
Collections.emptyList());
testCache.getInstanceConfigMap().get(_testInstanceId), _testInstanceId);

Assert.assertEquals(assignableNode.getFaultZone(), "2/");

Expand All @@ -245,8 +244,7 @@ public void testParseFaultZone() throws IOException {
when(testCache.getInstanceConfigMap()).thenReturn(instanceConfigMap);

assignableNode = new AssignableNode(testCache.getClusterConfig(),
testCache.getInstanceConfigMap().get(_testInstanceId), _testInstanceId,
Collections.emptyList());
testCache.getInstanceConfigMap().get(_testInstanceId), _testInstanceId);

Assert.assertEquals(assignableNode.getFaultZone(), "2/testInstance/");
}
Expand All @@ -259,8 +257,7 @@ public void testDefaultInstanceCapacity() {
InstanceConfig testInstanceConfig = new InstanceConfig("testInstanceConfigId");

AssignableNode assignableNode =
new AssignableNode(testClusterConfig, testInstanceConfig, _testInstanceId,
Collections.emptyList());
new AssignableNode(testClusterConfig, testInstanceConfig, _testInstanceId);
Assert.assertEquals(assignableNode.getMaxCapacity(), _capacityDataMap);
}

Expand All @@ -274,7 +271,6 @@ public void testIncompleteInstanceCapacity() {
InstanceConfig testInstanceConfig = new InstanceConfig("testInstanceConfigId");
testInstanceConfig.setInstanceCapacityMap(_capacityDataMap);

new AssignableNode(testClusterConfig, testInstanceConfig, _testInstanceId,
Collections.emptyList());
new AssignableNode(testClusterConfig, testInstanceConfig, _testInstanceId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,7 @@ Set<AssignableNode> generateNodes(ResourceControllerDataProvider testCache) {
Set<AssignableNode> nodeSet = new HashSet<>();
testCache.getInstanceConfigMap().values().stream().forEach(config -> nodeSet.add(
new AssignableNode(testCache.getClusterConfig(),
testCache.getInstanceConfigMap().get(_testInstanceId), config.getInstanceName(),
Collections.emptyList())));
testCache.getInstanceConfigMap().get(_testInstanceId), config.getInstanceName())));
return nodeSet;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,14 @@
* under the License.
*/

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import org.apache.helix.HelixConstants;
import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
import org.apache.helix.controller.rebalancer.waged.WagedRebalancer;
Expand All @@ -34,14 +42,6 @@
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.when;

Expand Down Expand Up @@ -111,7 +111,18 @@ public void testGenerateClusterModel() throws IOException {
Assert.assertEquals(
clusterModel.getAssignableNodes().values().stream().map(AssignableNode::getInstanceName)
.collect(Collectors.toSet()), _instances);
// Shall have 2 resources and 12 replicas
// Shall have 2 resources and 4 replicas, since all nodes are in the same fault zone.
Assert.assertEquals(clusterModel.getAssignableReplicaMap().size(), 2);
Assert.assertTrue(clusterModel.getAssignableReplicaMap().values().stream()
.allMatch(replicaSet -> replicaSet.size() == 4));

// Adjust instance fault zone, so they have different fault zones.
testCache.getInstanceConfigMap().values().stream()
.forEach(config -> config.setZoneId(config.getInstanceName()));
clusterModel = ClusterModelProvider.generateClusterModel(testCache, _resourceNames.stream()
.collect(Collectors.toMap(resource -> resource, resource -> new Resource(resource))),
_instances, Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap());
// Shall have 2 resources and 12 replicas after fault zone adjusted.
Assert.assertEquals(clusterModel.getAssignableReplicaMap().size(), 2);
Assert.assertTrue(clusterModel.getAssignableReplicaMap().values().stream()
.allMatch(replicaSet -> replicaSet.size() == 12));
Expand Down Expand Up @@ -197,10 +208,10 @@ public void testGenerateClusterModel() throws IOException {
_instances, Collections.singletonMap(HelixConstants.ChangeType.RESOURCE_CONFIG,
Collections.singleton(changedResourceName)), Collections.emptyMap(),
bestPossibleAssignment);
// There should be no existing assignment for all the resource except for resource2.
// There should be no existing assignment for all the resource except for resource2
Assert.assertEquals(clusterModel.getContext().getAssignmentForFaultZoneMap().size(), 1);
Map<String, Set<String>> resourceAssignmentMap =
clusterModel.getContext().getAssignmentForFaultZoneMap().get(_testFaultZoneId);
clusterModel.getContext().getAssignmentForFaultZoneMap().get(_testInstanceId);
// Should be only resource2 in the map
Assert.assertEquals(resourceAssignmentMap.size(), 1);
for (String resource : _resourceNames) {
Expand Down