Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -142,4 +142,87 @@ public static double measureBaselineDivergence(Map<String, ResourceAssignment> b
return numTotalBestPossibleReplicas == 0 ? 1.0d
: (1.0d - (double) numMatchedReplicas / (double) numTotalBestPossibleReplicas);
}

/**
* Calculates average partition weight per capacity key for a resource config. Example as below:
* Input =
* {
* "partition1": {
* "capacity1": 20,
* "capacity2": 40
* },
* "partition2": {
* "capacity1": 30,
* "capacity2": 50
* },
* "partition3": {
* "capacity1": 16,
* "capacity2": 30
* }
* }
*
* Total weight for key "capacity1" = 20 + 30 + 16 = 66;
* Total weight for key "capacity2" = 40 + 50 + 30 = 120;
* Total partitions = 3;
* Average partition weight for "capacity1" = 66 / 3 = 22;
* Average partition weight for "capacity2" = 120 / 3 = 40;
*
* Output =
* {
* "capacity1": 22,
* "capacity2": 40
* }
*
* @param partitionCapacityMap A map of partition capacity:
* <PartitionName or DEFAULT_PARTITION_KEY, <Capacity Key, Capacity Number>>
* @return A map of partition weight: capacity key -> average partition weight
*/
public static Map<String, Integer> calculateAveragePartitionWeight(
Comment thread
huizhilu marked this conversation as resolved.
Map<String, Map<String, Integer>> partitionCapacityMap) {
// capacity key -> [number of partitions, total weight per capacity key]
Map<String, PartitionWeightCounterEntry> countPartitionWeightMap = new HashMap<>();

// Aggregates partition weight for each capacity key.
partitionCapacityMap.values().forEach(partitionCapacityEntry ->
partitionCapacityEntry.forEach((capacityKey, weight) -> countPartitionWeightMap
.computeIfAbsent(capacityKey, counterEntry -> new PartitionWeightCounterEntry())
.increase(1, weight)));

// capacity key -> average partition weight
Map<String, Integer> averagePartitionWeightMap = new HashMap<>();

// Calculate average partition weight for each capacity key.
// Per capacity key level:
// average partition weight = (total partition weight) / (number of partitions)
for (Map.Entry<String, PartitionWeightCounterEntry> entry
: countPartitionWeightMap.entrySet()) {
String capacityKey = entry.getKey();
PartitionWeightCounterEntry weightEntry = entry.getValue();
int averageWeight = (int) (weightEntry.getWeight() / weightEntry.getPartitions());
averagePartitionWeightMap.put(capacityKey, averageWeight);
}

return averagePartitionWeightMap;
}

/*
* Represents total number of partitions and total partition weight for a capacity key.
*/
private static class PartitionWeightCounterEntry {
private int partitions;
private long weight;

private int getPartitions() {
return partitions;
}

private long getWeight() {
return weight;
}

private void increase(int partitions, int weight) {
this.partitions += partitions;
this.weight += weight;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,15 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;

import org.apache.helix.controller.LogUtil;
import org.apache.helix.controller.dataproviders.BaseControllerDataProvider;
import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
import org.apache.helix.controller.pipeline.AbstractBaseStage;
import org.apache.helix.controller.pipeline.StageException;
import org.apache.helix.controller.rebalancer.util.ResourceUsageCalculator;
import org.apache.helix.controller.rebalancer.waged.model.AssignableNode;
import org.apache.helix.controller.rebalancer.waged.model.ClusterModel;
import org.apache.helix.controller.rebalancer.waged.model.ClusterModelProvider;
Expand All @@ -41,6 +43,7 @@
import org.apache.helix.model.Partition;
import org.apache.helix.model.Resource;
import org.apache.helix.model.ResourceAssignment;
import org.apache.helix.model.ResourceConfig;
import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -89,8 +92,11 @@ public void process(ClusterEvent event) throws Exception {
final ClusterStatusMonitor clusterStatusMonitor =
event.getAttribute(AttributeName.clusterStatusMonitor.name());
if (clusterStatusMonitor != null && cache instanceof ResourceControllerDataProvider) {
reportInstanceCapacityMetrics(clusterStatusMonitor, (ResourceControllerDataProvider) cache,
resourceToRebalance, currentStateOutput);
final ResourceControllerDataProvider dataProvider = (ResourceControllerDataProvider) cache;
reportInstanceCapacityMetrics(clusterStatusMonitor, dataProvider, resourceToRebalance,
currentStateOutput);
reportResourcePartitionCapacityMetrics(dataProvider.getAsyncTasksThreadPool(),
clusterStatusMonitor, dataProvider.getResourceConfigMap().values());
}
}

Expand Down Expand Up @@ -271,4 +277,22 @@ private void reportInstanceCapacityMetrics(ClusterStatusMonitor clusterStatusMon
return null;
});
}

private void reportResourcePartitionCapacityMetrics(ExecutorService executorService,
ClusterStatusMonitor clusterStatusMonitor, Collection<ResourceConfig> resourceConfigs) {
asyncExecute(executorService, () -> {
try {
for (ResourceConfig config : resourceConfigs) {
Map<String, Integer> averageWeight = ResourceUsageCalculator
.calculateAveragePartitionWeight(config.getPartitionCapacityMap());
clusterStatusMonitor.updatePartitionWeight(config.getResourceName(), averageWeight);
}
} catch (Exception ex) {
LOG.error("Failed to report resource partition capacity metrics. Exception message: {}",
ex.getMessage());
}

return null;
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,25 @@ public void setResourceStatus(ExternalView externalView, IdealState idealState,
}
}

/**
* Updates metrics of average partition weight per capacity key for a resource. If a resource
* monitor is not yet existed for this resource, a new resource monitor will be created for this
* resource.
*
* @param resourceName The resource name for which partition weight is updated
* @param averageWeightMap A map of average partition weight of each capacity key:
* capacity key -> average partition weight
*/
public void updatePartitionWeight(String resourceName, Map<String, Integer> averageWeightMap) {
ResourceMonitor monitor = getOrCreateResourceMonitor(resourceName);
if (monitor == null) {
LOG.warn("Failed to update partition weight metric for resource: {} because resource monitor"
+ " is not created.", resourceName);
return;
}
monitor.updatePartitionWeightStats(averageWeightMap);
}

public void updateMissingTopStateDurationStats(String resourceName, long totalDuration,
long helixLatency, boolean isGraceful, boolean succeeded) {
ResourceMonitor resourceMonitor = getOrCreateResourceMonitor(resourceName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,19 @@
* under the License.
*/

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import javax.management.JMException;
import javax.management.ObjectName;

import com.codahale.metrics.Histogram;
import com.codahale.metrics.SlidingTimeWindowArrayReservoir;
import com.google.common.collect.Lists;
import org.apache.helix.HelixDefinedState;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
Expand All @@ -49,6 +50,8 @@ public enum RebalanceStatus {
INTERMEDIATE_STATE_CAL_FAILED
}

private static final String GAUGE_METRIC_SUFFIX = "Gauge";

// Gauges
private SimpleDynamicMetric<Long> _numOfPartitions;
private SimpleDynamicMetric<Long> _numOfPartitionsInExternalView;
Expand Down Expand Up @@ -83,31 +86,13 @@ public enum RebalanceStatus {
private final String _clusterName;
private final ObjectName _initObjectName;

// A map of dynamic capacity Gauges. The map's keys could change.
private final Map<String, SimpleDynamicMetric<Long>> _dynamicCapacityMetricsMap;

@Override
public ResourceMonitor register() throws JMException {
List<DynamicMetric<?, ?>> attributeList = new ArrayList<>();
attributeList.add(_numOfPartitions);
attributeList.add(_numOfPartitionsInExternalView);
attributeList.add(_numOfErrorPartitions);
attributeList.add(_numNonTopStatePartitions);
attributeList.add(_numLessMinActiveReplicaPartitions);
attributeList.add(_numLessReplicaPartitions);
attributeList.add(_numPendingRecoveryRebalancePartitions);
attributeList.add(_numPendingLoadRebalancePartitions);
attributeList.add(_numRecoveryRebalanceThrottledPartitions);
attributeList.add(_numLoadRebalanceThrottledPartitions);
attributeList.add(_externalViewIdealStateDiff);
attributeList.add(_successfulTopStateHandoffDurationCounter);
attributeList.add(_successTopStateHandoffCounter);
attributeList.add(_failedTopStateHandoffCounter);
attributeList.add(_maxSinglePartitionTopStateHandoffDuration);
attributeList.add(_partitionTopStateHandoffDurationGauge);
attributeList.add(_partitionTopStateHandoffHelixLatencyGauge);
attributeList.add(_partitionTopStateNonGracefulHandoffDurationGauge);
attributeList.add(_totalMessageReceived);
attributeList.add(_numPendingStateTransitions);
attributeList.add(_rebalanceState);
doRegister(attributeList, _initObjectName);
public DynamicMBeanProvider register() throws JMException {
doRegister(buildAttributeList(), _initObjectName);

return this;
}

Expand All @@ -116,10 +101,12 @@ public enum MonitorState {
}

@SuppressWarnings("unchecked")
public ResourceMonitor(String clusterName, String resourceName, ObjectName objectName) {
public ResourceMonitor(String clusterName, String resourceName, ObjectName objectName)
throws JMException {
_clusterName = clusterName;
_resourceName = resourceName;
_initObjectName = objectName;
_dynamicCapacityMetricsMap = new ConcurrentHashMap<>();

_externalViewIdealStateDiff = new SimpleDynamicMetric("DifferenceWithIdealStateGauge", 0L);
_numLoadRebalanceThrottledPartitions =
Expand Down Expand Up @@ -382,6 +369,36 @@ public void updateRebalancerStats(long numPendingRecoveryRebalancePartitions,
_numLoadRebalanceThrottledPartitions.updateValue(numLoadRebalanceThrottledPartitions);
}

/**
* Updates partition weight metric. If the partition capacity keys are changed, all MBean
* attributes will be updated accordingly: old capacity keys will be replaced with new capacity
* keys in MBean server.
*
* @param partitionWeightMap A map of partition weight: capacity key -> partition weight
*/
void updatePartitionWeightStats(Map<String, Integer> partitionWeightMap) {
synchronized (_dynamicCapacityMetricsMap) {
if (_dynamicCapacityMetricsMap.keySet().equals(partitionWeightMap.keySet())) {
for (Map.Entry<String, Integer> entry : partitionWeightMap.entrySet()) {
_dynamicCapacityMetricsMap.get(entry.getKey()).updateValue((long) entry.getValue());
}
return;
}

// Capacity keys are changed, so capacity attribute map needs to be updated.
_dynamicCapacityMetricsMap.clear();
for (Map.Entry<String, Integer> entry : partitionWeightMap.entrySet()) {
String capacityKey = entry.getKey();
_dynamicCapacityMetricsMap.put(capacityKey,
new SimpleDynamicMetric<>(capacityKey + GAUGE_METRIC_SUFFIX, (long) entry.getValue()));
}
}

// Update all MBean attributes.
updateAttributesInfo(buildAttributeList(),
"Resource monitor for resource: " + getResourceName());
}

public void setRebalanceState(RebalanceStatus state) {
_rebalanceState.updateValue(state.name());
}
Expand Down Expand Up @@ -428,4 +445,34 @@ public void resetMaxTopStateHandoffGauge() {
_lastResetTime = System.currentTimeMillis();
}
}

private List<DynamicMetric<?, ?>> buildAttributeList() {
List<DynamicMetric<?, ?>> attributeList = Lists.newArrayList(
_numOfPartitions,
_numOfPartitionsInExternalView,
_numOfErrorPartitions,
_numNonTopStatePartitions,
_numLessMinActiveReplicaPartitions,
_numLessReplicaPartitions,
_numPendingRecoveryRebalancePartitions,
_numPendingLoadRebalancePartitions,
_numRecoveryRebalanceThrottledPartitions,
_numLoadRebalanceThrottledPartitions,
_externalViewIdealStateDiff,
_successfulTopStateHandoffDurationCounter,
_successTopStateHandoffCounter,
_failedTopStateHandoffCounter,
_maxSinglePartitionTopStateHandoffDuration,
_partitionTopStateHandoffDurationGauge,
_partitionTopStateHandoffHelixLatencyGauge,
_partitionTopStateNonGracefulHandoffDurationGauge,
_totalMessageReceived,
_numPendingStateTransitions,
_rebalanceState
);

attributeList.addAll(_dynamicCapacityMetricsMap.values());

return attributeList;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,22 @@ public void testMeasureBaselineDivergence(Map<String, Map<String, Map<String, St
0.0d);
}

@Test
public void testCalculateAveragePartitionWeight() {
Map<String, Map<String, Integer>> partitionCapacityMap = ImmutableMap.of(
"partition1", ImmutableMap.of("capacity1", 20, "capacity2", 40),
"partition2", ImmutableMap.of("capacity1", 30, "capacity2", 50),
"partition3", ImmutableMap.of("capacity1", 16, "capacity2", 30));

Map<String, Integer> averageCapacityWeightMap =
ResourceUsageCalculator.calculateAveragePartitionWeight(partitionCapacityMap);
Map<String, Integer> expectedAverageWeightMap =
ImmutableMap.of("capacity1", 22, "capacity2", 40);

Assert.assertNotNull(averageCapacityWeightMap);
Assert.assertEquals(averageCapacityWeightMap, expectedAverageWeightMap);
}

private Map<String, ResourceAssignment> buildResourceAssignment(
Map<String, Map<String, Map<String, String>>> resourceMap) {
Map<String, ResourceAssignment> assignment = new HashMap<>();
Expand All @@ -78,7 +94,7 @@ private Map<String, ResourceAssignment> buildResourceAssignment(
}

@DataProvider(name = "TestMeasureBaselineDivergenceInput")
public Object[][] loadTestMeasureBaselineDivergenceInput() {
private Object[][] loadTestMeasureBaselineDivergenceInput() {
final String[] params =
new String[]{"baseline", "someMatchBestPossible", "noMatchBestPossible"};
return TestInputLoader
Expand Down
Loading