Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 107 additions & 22 deletions helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,21 @@
* under the License.
*/

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.helix.HelixException;
import org.apache.helix.HelixProperty;
import org.apache.helix.ZNRecord;
import org.apache.helix.api.config.HelixConfigProperty;
import org.apache.helix.api.config.StateTransitionThrottleConfig;
import org.apache.helix.api.config.StateTransitionTimeoutConfig;

import java.util.ArrayList;
Comment thread
jiajunwang marked this conversation as resolved.
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
* Cluster configurations
*/
Expand Down Expand Up @@ -80,7 +82,19 @@ public enum ClusterConfigProperty {
DISABLED_INSTANCES,

// Specifies job types and used for quota allocation
QUOTA_TYPES
QUOTA_TYPES,

// The required instance capacity keys for resource partition assignment calculation.
INSTANCE_CAPACITY_KEYS,
// The preference of the rebalance result.
// EVENNESS - Evenness of the resource utilization, partition, and top state distribution.
// LESS_MOVEMENT - the tendency of keeping the current assignment instead of moving the partition for optimal assignment.
REBALANCE_PREFERENCE
}

public enum GlobalRebalancePreferenceKey {
Comment thread
jiajunwang marked this conversation as resolved.
EVENNESS,
LESS_MOVEMENT
}

private final static int DEFAULT_MAX_CONCURRENT_TASK_PER_INSTANCE = 40;
Expand All @@ -95,6 +109,15 @@ public enum ClusterConfigProperty {

public final static String TASK_QUOTA_RATIO_NOT_SET = "-1";

// Default preference for all the aspects should be the same to ensure balanced setup.
public final static Map<GlobalRebalancePreferenceKey, Integer>
DEFAULT_GLOBAL_REBALANCE_PREFERENCE =
ImmutableMap.<GlobalRebalancePreferenceKey, Integer>builder()
.put(GlobalRebalancePreferenceKey.EVENNESS, 1)
.put(GlobalRebalancePreferenceKey.LESS_MOVEMENT, 1).build();
private final static int MAX_REBALANCE_PREFERENCE = 10;
private final static int MIN_REBALANCE_PREFERENCE = 0;

/**
* Instantiate for a specific cluster
* @param cluster the cluster identifier
Expand All @@ -113,21 +136,21 @@ public ClusterConfig(ZNRecord record) {

/**
* Set task quota type with the ratio of this quota.
* @param quotaType String
* @param quotaType String
* @param quotaRatio int
*/
public void setTaskQuotaRatio(String quotaType, int quotaRatio) {
if (_record.getMapField(ClusterConfigProperty.QUOTA_TYPES.name()) == null) {
_record.setMapField(ClusterConfigProperty.QUOTA_TYPES.name(), new HashMap<String, String>());
}
_record.getMapField(ClusterConfigProperty.QUOTA_TYPES.name()).put(quotaType,
Integer.toString(quotaRatio));
_record.getMapField(ClusterConfigProperty.QUOTA_TYPES.name())
.put(quotaType, Integer.toString(quotaRatio));
}

/**
* Set task quota type with the ratio of this quota. Quota ratio must be a String that is
* parse-able into an int.
* @param quotaType String
* @param quotaType String
* @param quotaRatio String
*/
public void setTaskQuotaRatio(String quotaType, String quotaRatio) {
Expand Down Expand Up @@ -210,8 +233,8 @@ public void setPersistBestPossibleAssignment(Boolean enable) {
* @return
*/
public Boolean isPersistIntermediateAssignment() {
return _record.getBooleanField(ClusterConfigProperty.PERSIST_INTERMEDIATE_ASSIGNMENT.toString(),
false);
return _record
.getBooleanField(ClusterConfigProperty.PERSIST_INTERMEDIATE_ASSIGNMENT.toString(), false);
}

/**
Expand All @@ -233,8 +256,8 @@ public void setPersistIntermediateAssignment(Boolean enable) {
}

public Boolean isPipelineTriggersDisabled() {
return _record.getBooleanField(ClusterConfigProperty.HELIX_DISABLE_PIPELINE_TRIGGERS.toString(),
false);
return _record
.getBooleanField(ClusterConfigProperty.HELIX_DISABLE_PIPELINE_TRIGGERS.toString(), false);
}

/**
Expand Down Expand Up @@ -403,8 +426,8 @@ public void setNumOfflineInstancesForAutoExit(int maintenanceRecoveryThreshold)
* @return
*/
public int getNumOfflineInstancesForAutoExit() {
return _record.getIntField(ClusterConfigProperty.NUM_OFFLINE_INSTANCES_FOR_AUTO_EXIT.name(),
-1);
return _record
.getIntField(ClusterConfigProperty.NUM_OFFLINE_INSTANCES_FOR_AUTO_EXIT.name(), -1);
}

/**
Expand Down Expand Up @@ -444,9 +467,7 @@ public boolean equals(Object obj) {
if (obj instanceof ClusterConfig) {
ClusterConfig that = (ClusterConfig) obj;

if (this.getId().equals(that.getId())) {
return true;
}
return this.getId().equals(that.getId());
}
return false;
}
Expand Down Expand Up @@ -490,8 +511,8 @@ public void setStateTransitionThrottleConfigs(
}

if (!configStrs.isEmpty()) {
_record.setListField(ClusterConfigProperty.STATE_TRANSITION_THROTTLE_CONFIGS.name(),
configStrs);
_record
.setListField(ClusterConfigProperty.STATE_TRANSITION_THROTTLE_CONFIGS.name(), configStrs);
}
}

Expand Down Expand Up @@ -579,7 +600,7 @@ public void setMaxConcurrentTaskPerInstance(int maxConcurrentTaskPerInstance) {
public int getErrorPartitionThresholdForLoadBalance() {
return _record.getIntField(
ClusterConfigProperty.ERROR_PARTITION_THRESHOLD_FOR_LOAD_BALANCE.name(),
DEFAULT_ERROR_PARTITION_THRESHOLD_FOR_LOAD_BALANCE);
DEFAULT_ERROR_PARTITION_THRESHOLD_FOR_LOAD_BALANCE);
}

/**
Expand Down Expand Up @@ -657,6 +678,70 @@ public void enableP2PMessage(boolean enabled) {
_record.setBooleanField(HelixConfigProperty.P2P_MESSAGE_ENABLED.name(), enabled);
}

/**
* Set the required Instance Capacity Keys.
* @param capacityKeys
*/
public void setInstanceCapacityKeys(List<String> capacityKeys) {
if (capacityKeys == null || capacityKeys.isEmpty()) {
Comment thread
jiajunwang marked this conversation as resolved.
throw new IllegalArgumentException("The input instance capacity key list is empty.");
}
_record.setListField(ClusterConfigProperty.INSTANCE_CAPACITY_KEYS.name(), capacityKeys);
}

/**
* @return The required Instance Capacity Keys. If not configured, return an empty list.
*/
public List<String> getInstanceCapacityKeys() {
List<String> capacityKeys = _record.getListField(ClusterConfigProperty.INSTANCE_CAPACITY_KEYS.name());
if (capacityKeys == null) {
return Collections.emptyList();
}
return capacityKeys;
}

/**
* Set the global rebalancer's assignment preference.
* @param preference A map of the GlobalRebalancePreferenceKey and the corresponding weight.
* The ratio of the configured weights will determine the rebalancer's behavior.
*/
public void setGlobalRebalancePreference(Map<GlobalRebalancePreferenceKey, Integer> preference) {
Map<String, String> preferenceMap = new HashMap<>();

preference.entrySet().stream().forEach(entry -> {
if (entry.getValue() > MAX_REBALANCE_PREFERENCE
|| entry.getValue() < MIN_REBALANCE_PREFERENCE) {
throw new IllegalArgumentException(String
.format("Invalid global rebalance preference configuration. Key %s, Value %d.",
entry.getKey().name(), entry.getValue()));
}
preferenceMap.put(entry.getKey().name(), Integer.toString(entry.getValue()));
});

_record.setMapField(ClusterConfigProperty.REBALANCE_PREFERENCE.name(), preferenceMap);
}

/**
* Get the global rebalancer's assignment preference.
*/
public Map<GlobalRebalancePreferenceKey, Integer> getGlobalRebalancePreference() {
Map<String, String> preferenceStrMap =
_record.getMapField(ClusterConfigProperty.REBALANCE_PREFERENCE.name());
if (preferenceStrMap != null && !preferenceStrMap.isEmpty()) {
Map<GlobalRebalancePreferenceKey, Integer> preference = new HashMap<>();
Comment thread
jiajunwang marked this conversation as resolved.
for (GlobalRebalancePreferenceKey key : GlobalRebalancePreferenceKey.values()) {
if (!preferenceStrMap.containsKey(key.name())) {
// If any key is not configured with a value, return the default config.
return DEFAULT_GLOBAL_REBALANCE_PREFERENCE;
}
preference.put(key, Integer.parseInt(preferenceStrMap.get(key.name())));
}
return preference;
}
// If configuration is not complete, return the default one.
return DEFAULT_GLOBAL_REBALANCE_PREFERENCE;
}

/**
* Get IdealState rules defined in the cluster config.
* @return
Expand Down
62 changes: 52 additions & 10 deletions helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,14 @@
* under the License.
*/

import com.google.common.base.Splitter;
import org.apache.helix.HelixException;
import org.apache.helix.HelixProperty;
import org.apache.helix.ZNRecord;
import org.apache.helix.util.HelixUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand All @@ -27,15 +35,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.apache.helix.HelixException;
import org.apache.helix.HelixProperty;
import org.apache.helix.ZNRecord;
import org.apache.helix.util.HelixUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Splitter;
import java.util.stream.Collectors;

/**
* Instance configurations
Expand All @@ -55,7 +55,8 @@ public enum InstanceConfigProperty {
INSTANCE_WEIGHT,
DOMAIN,
DELAY_REBALANCE_ENABLED,
MAX_CONCURRENT_TASK
MAX_CONCURRENT_TASK,
INSTANCE_CAPACITY_MAP
}

public static final int WEIGHT_NOT_SET = -1;
Expand Down Expand Up @@ -505,6 +506,47 @@ public void setMaxConcurrentTask(int maxConcurrentTask) {
_record.setIntField(InstanceConfigProperty.MAX_CONCURRENT_TASK.name(), maxConcurrentTask);
}

/**
* Get the instance capacity information from the map fields
*
* @return data map if it exists, or empty map
*/
public Map<String, Integer> getInstanceCapacityMap() {
Map<String, String> capacityData =
_record.getMapField(InstanceConfigProperty.INSTANCE_CAPACITY_MAP.name());

if (capacityData != null) {
return capacityData.entrySet().stream().collect(
Collectors.toMap(entry -> entry.getKey(), entry -> Integer.parseInt(entry.getValue())));
}
return Collections.emptyMap();
}

/**
* Set the instance capacity information with an Integer mapping
* @param capacityDataMap - map of instance capacity data
* @throws IllegalArgumentException - when any of the data value is a negative number or when the map is empty
*/
public void setInstanceCapacityMap(Map<String, Integer> capacityDataMap)
throws IllegalArgumentException {
if (capacityDataMap == null || capacityDataMap.size() == 0) {
throw new IllegalArgumentException("Capacity Data is empty");
}

Map<String, String> capacityData = new HashMap<>();

capacityDataMap.entrySet().stream().forEach(entry -> {
if (entry.getValue() < 0) {
throw new IllegalArgumentException(String
.format("Capacity Data contains a negative value: %s = %d", entry.getKey(),
entry.getValue()));
}
capacityData.put(entry.getKey(), Integer.toString(entry.getValue()));
});

_record.setMapField(InstanceConfigProperty.INSTANCE_CAPACITY_MAP.name(), capacityData);
}

@Override
public boolean equals(Object obj) {
if (obj instanceof InstanceConfig) {
Expand Down
Loading