diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ResourceUsageCalculator.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ResourceUsageCalculator.java index ad58eaeaba..e7a1b948b8 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ResourceUsageCalculator.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ResourceUsageCalculator.java @@ -142,4 +142,87 @@ public static double measureBaselineDivergence(Map b return numTotalBestPossibleReplicas == 0 ? 1.0d : (1.0d - (double) numMatchedReplicas / (double) numTotalBestPossibleReplicas); } + + /** + * Calculates average partition weight per capacity key for a resource config. Example as below: + * Input = + * { + * "partition1": { + * "capacity1": 20, + * "capacity2": 40 + * }, + * "partition2": { + * "capacity1": 30, + * "capacity2": 50 + * }, + * "partition3": { + * "capacity1": 16, + * "capacity2": 30 + * } + * } + * + * Total weight for key "capacity1" = 20 + 30 + 16 = 66; + * Total weight for key "capacity2" = 40 + 50 + 30 = 120; + * Total partitions = 3; + * Average partition weight for "capacity1" = 66 / 3 = 22; + * Average partition weight for "capacity2" = 120 / 3 = 40; + * + * Output = + * { + * "capacity1": 22, + * "capacity2": 40 + * } + * + * @param partitionCapacityMap A map of partition capacity: + * > + * @return A map of partition weight: capacity key -> average partition weight + */ + public static Map calculateAveragePartitionWeight( + Map> partitionCapacityMap) { + // capacity key -> [number of partitions, total weight per capacity key] + Map countPartitionWeightMap = new HashMap<>(); + + // Aggregates partition weight for each capacity key. + partitionCapacityMap.values().forEach(partitionCapacityEntry -> + partitionCapacityEntry.forEach((capacityKey, weight) -> countPartitionWeightMap + .computeIfAbsent(capacityKey, counterEntry -> new PartitionWeightCounterEntry()) + .increase(1, weight))); + + // capacity key -> average partition weight + Map averagePartitionWeightMap = new HashMap<>(); + + // Calculate average partition weight for each capacity key. + // Per capacity key level: + // average partition weight = (total partition weight) / (number of partitions) + for (Map.Entry entry + : countPartitionWeightMap.entrySet()) { + String capacityKey = entry.getKey(); + PartitionWeightCounterEntry weightEntry = entry.getValue(); + int averageWeight = (int) (weightEntry.getWeight() / weightEntry.getPartitions()); + averagePartitionWeightMap.put(capacityKey, averageWeight); + } + + return averagePartitionWeightMap; + } + + /* + * Represents total number of partitions and total partition weight for a capacity key. + */ + private static class PartitionWeightCounterEntry { + private int partitions; + private long weight; + + private int getPartitions() { + return partitions; + } + + private long getWeight() { + return weight; + } + + private void increase(int partitions, int weight) { + this.partitions += partitions; + this.weight += weight; + } + } } diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java index 727804b6f7..62fda33f90 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java @@ -23,6 +23,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutorService; import java.util.stream.Collectors; import org.apache.helix.controller.LogUtil; @@ -30,6 +31,7 @@ import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider; import org.apache.helix.controller.pipeline.AbstractBaseStage; import org.apache.helix.controller.pipeline.StageException; +import org.apache.helix.controller.rebalancer.util.ResourceUsageCalculator; import org.apache.helix.controller.rebalancer.waged.model.AssignableNode; import org.apache.helix.controller.rebalancer.waged.model.ClusterModel; import org.apache.helix.controller.rebalancer.waged.model.ClusterModelProvider; @@ -41,6 +43,7 @@ import org.apache.helix.model.Partition; import org.apache.helix.model.Resource; import org.apache.helix.model.ResourceAssignment; +import org.apache.helix.model.ResourceConfig; import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -89,8 +92,11 @@ public void process(ClusterEvent event) throws Exception { final ClusterStatusMonitor clusterStatusMonitor = event.getAttribute(AttributeName.clusterStatusMonitor.name()); if (clusterStatusMonitor != null && cache instanceof ResourceControllerDataProvider) { - reportInstanceCapacityMetrics(clusterStatusMonitor, (ResourceControllerDataProvider) cache, - resourceToRebalance, currentStateOutput); + final ResourceControllerDataProvider dataProvider = (ResourceControllerDataProvider) cache; + reportInstanceCapacityMetrics(clusterStatusMonitor, dataProvider, resourceToRebalance, + currentStateOutput); + reportResourcePartitionCapacityMetrics(dataProvider.getAsyncTasksThreadPool(), + clusterStatusMonitor, dataProvider.getResourceConfigMap().values()); } } @@ -271,4 +277,22 @@ private void reportInstanceCapacityMetrics(ClusterStatusMonitor clusterStatusMon return null; }); } + + private void reportResourcePartitionCapacityMetrics(ExecutorService executorService, + ClusterStatusMonitor clusterStatusMonitor, Collection resourceConfigs) { + asyncExecute(executorService, () -> { + try { + for (ResourceConfig config : resourceConfigs) { + Map averageWeight = ResourceUsageCalculator + .calculateAveragePartitionWeight(config.getPartitionCapacityMap()); + clusterStatusMonitor.updatePartitionWeight(config.getResourceName(), averageWeight); + } + } catch (Exception ex) { + LOG.error("Failed to report resource partition capacity metrics. Exception message: {}", + ex.getMessage()); + } + + return null; + }); + } } diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java index 96f85bf499..fc0b19d83d 100644 --- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java +++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java @@ -497,6 +497,25 @@ public void setResourceStatus(ExternalView externalView, IdealState idealState, } } + /** + * Updates metrics of average partition weight per capacity key for a resource. If a resource + * monitor is not yet existed for this resource, a new resource monitor will be created for this + * resource. + * + * @param resourceName The resource name for which partition weight is updated + * @param averageWeightMap A map of average partition weight of each capacity key: + * capacity key -> average partition weight + */ + public void updatePartitionWeight(String resourceName, Map averageWeightMap) { + ResourceMonitor monitor = getOrCreateResourceMonitor(resourceName); + if (monitor == null) { + LOG.warn("Failed to update partition weight metric for resource: {} because resource monitor" + + " is not created.", resourceName); + return; + } + monitor.updatePartitionWeightStats(averageWeightMap); + } + public void updateMissingTopStateDurationStats(String resourceName, long totalDuration, long helixLatency, boolean isGraceful, boolean succeeded) { ResourceMonitor resourceMonitor = getOrCreateResourceMonitor(resourceName); diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java index d7a368e863..af9c318a4d 100644 --- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java +++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java @@ -19,18 +19,19 @@ * under the License. */ -import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import javax.management.JMException; import javax.management.ObjectName; import com.codahale.metrics.Histogram; import com.codahale.metrics.SlidingTimeWindowArrayReservoir; +import com.google.common.collect.Lists; import org.apache.helix.HelixDefinedState; import org.apache.helix.model.ExternalView; import org.apache.helix.model.IdealState; @@ -49,6 +50,8 @@ public enum RebalanceStatus { INTERMEDIATE_STATE_CAL_FAILED } + private static final String GAUGE_METRIC_SUFFIX = "Gauge"; + // Gauges private SimpleDynamicMetric _numOfPartitions; private SimpleDynamicMetric _numOfPartitionsInExternalView; @@ -83,31 +86,13 @@ public enum RebalanceStatus { private final String _clusterName; private final ObjectName _initObjectName; + // A map of dynamic capacity Gauges. The map's keys could change. + private final Map> _dynamicCapacityMetricsMap; + @Override - public ResourceMonitor register() throws JMException { - List> attributeList = new ArrayList<>(); - attributeList.add(_numOfPartitions); - attributeList.add(_numOfPartitionsInExternalView); - attributeList.add(_numOfErrorPartitions); - attributeList.add(_numNonTopStatePartitions); - attributeList.add(_numLessMinActiveReplicaPartitions); - attributeList.add(_numLessReplicaPartitions); - attributeList.add(_numPendingRecoveryRebalancePartitions); - attributeList.add(_numPendingLoadRebalancePartitions); - attributeList.add(_numRecoveryRebalanceThrottledPartitions); - attributeList.add(_numLoadRebalanceThrottledPartitions); - attributeList.add(_externalViewIdealStateDiff); - attributeList.add(_successfulTopStateHandoffDurationCounter); - attributeList.add(_successTopStateHandoffCounter); - attributeList.add(_failedTopStateHandoffCounter); - attributeList.add(_maxSinglePartitionTopStateHandoffDuration); - attributeList.add(_partitionTopStateHandoffDurationGauge); - attributeList.add(_partitionTopStateHandoffHelixLatencyGauge); - attributeList.add(_partitionTopStateNonGracefulHandoffDurationGauge); - attributeList.add(_totalMessageReceived); - attributeList.add(_numPendingStateTransitions); - attributeList.add(_rebalanceState); - doRegister(attributeList, _initObjectName); + public DynamicMBeanProvider register() throws JMException { + doRegister(buildAttributeList(), _initObjectName); + return this; } @@ -116,10 +101,12 @@ public enum MonitorState { } @SuppressWarnings("unchecked") - public ResourceMonitor(String clusterName, String resourceName, ObjectName objectName) { + public ResourceMonitor(String clusterName, String resourceName, ObjectName objectName) + throws JMException { _clusterName = clusterName; _resourceName = resourceName; _initObjectName = objectName; + _dynamicCapacityMetricsMap = new ConcurrentHashMap<>(); _externalViewIdealStateDiff = new SimpleDynamicMetric("DifferenceWithIdealStateGauge", 0L); _numLoadRebalanceThrottledPartitions = @@ -382,6 +369,36 @@ public void updateRebalancerStats(long numPendingRecoveryRebalancePartitions, _numLoadRebalanceThrottledPartitions.updateValue(numLoadRebalanceThrottledPartitions); } + /** + * Updates partition weight metric. If the partition capacity keys are changed, all MBean + * attributes will be updated accordingly: old capacity keys will be replaced with new capacity + * keys in MBean server. + * + * @param partitionWeightMap A map of partition weight: capacity key -> partition weight + */ + void updatePartitionWeightStats(Map partitionWeightMap) { + synchronized (_dynamicCapacityMetricsMap) { + if (_dynamicCapacityMetricsMap.keySet().equals(partitionWeightMap.keySet())) { + for (Map.Entry entry : partitionWeightMap.entrySet()) { + _dynamicCapacityMetricsMap.get(entry.getKey()).updateValue((long) entry.getValue()); + } + return; + } + + // Capacity keys are changed, so capacity attribute map needs to be updated. + _dynamicCapacityMetricsMap.clear(); + for (Map.Entry entry : partitionWeightMap.entrySet()) { + String capacityKey = entry.getKey(); + _dynamicCapacityMetricsMap.put(capacityKey, + new SimpleDynamicMetric<>(capacityKey + GAUGE_METRIC_SUFFIX, (long) entry.getValue())); + } + } + + // Update all MBean attributes. + updateAttributesInfo(buildAttributeList(), + "Resource monitor for resource: " + getResourceName()); + } + public void setRebalanceState(RebalanceStatus state) { _rebalanceState.updateValue(state.name()); } @@ -428,4 +445,34 @@ public void resetMaxTopStateHandoffGauge() { _lastResetTime = System.currentTimeMillis(); } } + + private List> buildAttributeList() { + List> attributeList = Lists.newArrayList( + _numOfPartitions, + _numOfPartitionsInExternalView, + _numOfErrorPartitions, + _numNonTopStatePartitions, + _numLessMinActiveReplicaPartitions, + _numLessReplicaPartitions, + _numPendingRecoveryRebalancePartitions, + _numPendingLoadRebalancePartitions, + _numRecoveryRebalanceThrottledPartitions, + _numLoadRebalanceThrottledPartitions, + _externalViewIdealStateDiff, + _successfulTopStateHandoffDurationCounter, + _successTopStateHandoffCounter, + _failedTopStateHandoffCounter, + _maxSinglePartitionTopStateHandoffDuration, + _partitionTopStateHandoffDurationGauge, + _partitionTopStateHandoffHelixLatencyGauge, + _partitionTopStateNonGracefulHandoffDurationGauge, + _totalMessageReceived, + _numPendingStateTransitions, + _rebalanceState + ); + + attributeList.addAll(_dynamicCapacityMetricsMap.values()); + + return attributeList; + } } diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/util/TestResourceUsageCalculator.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/util/TestResourceUsageCalculator.java index b0b2142da8..ef1737f567 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/util/TestResourceUsageCalculator.java +++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/util/TestResourceUsageCalculator.java @@ -60,6 +60,22 @@ public void testMeasureBaselineDivergence(Map> partitionCapacityMap = ImmutableMap.of( + "partition1", ImmutableMap.of("capacity1", 20, "capacity2", 40), + "partition2", ImmutableMap.of("capacity1", 30, "capacity2", 50), + "partition3", ImmutableMap.of("capacity1", 16, "capacity2", 30)); + + Map averageCapacityWeightMap = + ResourceUsageCalculator.calculateAveragePartitionWeight(partitionCapacityMap); + Map expectedAverageWeightMap = + ImmutableMap.of("capacity1", 22, "capacity2", 40); + + Assert.assertNotNull(averageCapacityWeightMap); + Assert.assertEquals(averageCapacityWeightMap, expectedAverageWeightMap); + } + private Map buildResourceAssignment( Map>> resourceMap) { Map assignment = new HashMap<>(); @@ -78,7 +94,7 @@ private Map buildResourceAssignment( } @DataProvider(name = "TestMeasureBaselineDivergenceInput") - public Object[][] loadTestMeasureBaselineDivergenceInput() { + private Object[][] loadTestMeasureBaselineDivergenceInput() { final String[] params = new String[]{"baseline", "someMatchBestPossible", "noMatchBestPossible"}; return TestInputLoader diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestResourceMonitor.java b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestResourceMonitor.java index 18576dda66..f630124106 100644 --- a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestResourceMonitor.java +++ b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestResourceMonitor.java @@ -19,15 +19,24 @@ * under the License. */ +import java.io.IOException; +import java.lang.management.ManagementFactory; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Random; import java.util.TreeMap; +import javax.management.AttributeNotFoundException; +import javax.management.InstanceNotFoundException; import javax.management.JMException; +import javax.management.MBeanException; +import javax.management.MBeanServerConnection; import javax.management.ObjectName; +import javax.management.ReflectionException; +import com.google.common.collect.ImmutableMap; +import org.apache.helix.TestHelper; import org.apache.helix.ZNRecord; import org.apache.helix.model.BuiltInStateModelDefinitions; import org.apache.helix.model.ExternalView; @@ -46,168 +55,220 @@ public class TestResourceMonitor { @Test() public void testReportData() throws JMException { final int n = 5; - ResourceMonitor monitor = new ResourceMonitor(_clusterName, _dbName, new ObjectName("testDomain:key=value")); + ResourceMonitor monitor = + new ResourceMonitor(_clusterName, _dbName, new ObjectName("testDomain:key=value")); monitor.register(); - List instances = new ArrayList<>(); - for (int i = 0; i < n; i++) { - String instance = "localhost_" + (12918 + i); - instances.add(instance); - } + try { + List instances = new ArrayList<>(); + for (int i = 0; i < n; i++) { + String instance = "localhost_" + (12918 + i); + instances.add(instance); + } + + ZNRecord idealStateRecord = DefaultIdealStateCalculator + .calculateIdealState(instances, _partitions, _replicas - 1, _dbName, "MASTER", "SLAVE"); + IdealState idealState = new IdealState(deepCopyZNRecord(idealStateRecord)); + idealState.setMinActiveReplicas(_replicas - 1); + ExternalView externalView = new ExternalView(deepCopyZNRecord(idealStateRecord)); + StateModelDefinition stateModelDef = + BuiltInStateModelDefinitions.MasterSlave.getStateModelDefinition(); + + monitor.updateResourceState(externalView, idealState, stateModelDef); + + Assert.assertEquals(monitor.getDifferenceWithIdealStateGauge(), 0); + Assert.assertEquals(monitor.getErrorPartitionGauge(), 0); + Assert.assertEquals(monitor.getExternalViewPartitionGauge(), _partitions); + Assert.assertEquals(monitor.getPartitionGauge(), _partitions); + Assert.assertEquals(monitor.getMissingMinActiveReplicaPartitionGauge(), 0); + Assert.assertEquals(monitor.getMissingReplicaPartitionGauge(), 0); + Assert.assertEquals(monitor.getMissingTopStatePartitionGauge(), 0); + Assert.assertEquals(monitor.getBeanName(), _clusterName + " " + _dbName); - ZNRecord idealStateRecord = DefaultIdealStateCalculator - .calculateIdealState(instances, _partitions, _replicas - 1, _dbName, "MASTER", "SLAVE"); - IdealState idealState = new IdealState(deepCopyZNRecord(idealStateRecord)); - idealState.setMinActiveReplicas(_replicas - 1); - ExternalView externalView = new ExternalView(deepCopyZNRecord(idealStateRecord)); - StateModelDefinition stateModelDef = - BuiltInStateModelDefinitions.MasterSlave.getStateModelDefinition(); - - monitor.updateResourceState(externalView, idealState, stateModelDef); - - Assert.assertEquals(monitor.getDifferenceWithIdealStateGauge(), 0); - Assert.assertEquals(monitor.getErrorPartitionGauge(), 0); - Assert.assertEquals(monitor.getExternalViewPartitionGauge(), _partitions); - Assert.assertEquals(monitor.getPartitionGauge(), _partitions); - Assert.assertEquals(monitor.getMissingMinActiveReplicaPartitionGauge(), 0); - Assert.assertEquals(monitor.getMissingReplicaPartitionGauge(), 0); - Assert.assertEquals(monitor.getMissingTopStatePartitionGauge(), 0); - Assert.assertEquals(monitor.getBeanName(), _clusterName + " " + _dbName); - - int errorCount = 5; - Random r = new Random(); - int start = r.nextInt(_partitions - errorCount - 1); - for (int i = start; i < start + errorCount; i++) { - String partition = _dbName + "_" + i; - Map map = externalView.getStateMap(partition); - for (String key : map.keySet()) { - if (map.get(key).equalsIgnoreCase("SLAVE")) { - map.put(key, "ERROR"); - break; + int errorCount = 5; + Random r = new Random(); + int start = r.nextInt(_partitions - errorCount - 1); + for (int i = start; i < start + errorCount; i++) { + String partition = _dbName + "_" + i; + Map map = externalView.getStateMap(partition); + for (String key : map.keySet()) { + if (map.get(key).equalsIgnoreCase("SLAVE")) { + map.put(key, "ERROR"); + break; + } } + externalView.setStateMap(partition, map); } - externalView.setStateMap(partition, map); - } - monitor.updateResourceState(externalView, idealState, stateModelDef); - - Assert.assertEquals(monitor.getDifferenceWithIdealStateGauge(), errorCount); - Assert.assertEquals(monitor.getErrorPartitionGauge(), errorCount); - Assert.assertEquals(monitor.getExternalViewPartitionGauge(), _partitions); - Assert.assertEquals(monitor.getPartitionGauge(), _partitions); - Assert.assertEquals(monitor.getMissingMinActiveReplicaPartitionGauge(), 0); - Assert.assertEquals(monitor.getMissingReplicaPartitionGauge(), errorCount); - Assert.assertEquals(monitor.getMissingTopStatePartitionGauge(), 0); - - int lessMinActiveReplica = 6; - externalView = new ExternalView(deepCopyZNRecord(idealStateRecord)); - start = r.nextInt(_partitions - lessMinActiveReplica - 1); - for (int i = start; i < start + lessMinActiveReplica; i++) { - String partition = _dbName + "_" + i; - Map map = externalView.getStateMap(partition); - Iterator it = map.keySet().iterator(); - int flag = 0; - while (it.hasNext()) { - String key = it.next(); - if (map.get(key).equalsIgnoreCase("SLAVE")) { - if (flag++ % 2 == 0) { - map.put(key, "OFFLINE"); - } else { - it.remove(); + monitor.updateResourceState(externalView, idealState, stateModelDef); + + Assert.assertEquals(monitor.getDifferenceWithIdealStateGauge(), errorCount); + Assert.assertEquals(monitor.getErrorPartitionGauge(), errorCount); + Assert.assertEquals(monitor.getExternalViewPartitionGauge(), _partitions); + Assert.assertEquals(monitor.getPartitionGauge(), _partitions); + Assert.assertEquals(monitor.getMissingMinActiveReplicaPartitionGauge(), 0); + Assert.assertEquals(monitor.getMissingReplicaPartitionGauge(), errorCount); + Assert.assertEquals(monitor.getMissingTopStatePartitionGauge(), 0); + + int lessMinActiveReplica = 6; + externalView = new ExternalView(deepCopyZNRecord(idealStateRecord)); + start = r.nextInt(_partitions - lessMinActiveReplica - 1); + for (int i = start; i < start + lessMinActiveReplica; i++) { + String partition = _dbName + "_" + i; + Map map = externalView.getStateMap(partition); + Iterator it = map.keySet().iterator(); + int flag = 0; + while (it.hasNext()) { + String key = it.next(); + if (map.get(key).equalsIgnoreCase("SLAVE")) { + if (flag++ % 2 == 0) { + map.put(key, "OFFLINE"); + } else { + it.remove(); + } } } + externalView.setStateMap(partition, map); } - externalView.setStateMap(partition, map); - } - monitor.updateResourceState(externalView, idealState, stateModelDef); - - Assert.assertEquals(monitor.getDifferenceWithIdealStateGauge(), lessMinActiveReplica); - Assert.assertEquals(monitor.getErrorPartitionGauge(), 0); - Assert.assertEquals(monitor.getExternalViewPartitionGauge(), _partitions); - Assert.assertEquals(monitor.getPartitionGauge(), _partitions); - Assert.assertEquals(monitor.getMissingMinActiveReplicaPartitionGauge(), lessMinActiveReplica); - Assert.assertEquals(monitor.getMissingReplicaPartitionGauge(), lessMinActiveReplica); - Assert.assertEquals(monitor.getMissingTopStatePartitionGauge(), 0); - - int lessReplica = 4; - externalView = new ExternalView(deepCopyZNRecord(idealStateRecord)); - start = r.nextInt(_partitions - lessReplica - 1); - for (int i = start; i < start + lessReplica; i++) { - String partition = _dbName + "_" + i; - Map map = externalView.getStateMap(partition); - int flag = 0; - Iterator it = map.keySet().iterator(); - while (it.hasNext()) { - String key = it.next(); - if (map.get(key).equalsIgnoreCase("SLAVE")) { - if (flag++ % 2 == 0) { - map.put(key, "OFFLINE"); - } else { - it.remove(); + monitor.updateResourceState(externalView, idealState, stateModelDef); + + Assert.assertEquals(monitor.getDifferenceWithIdealStateGauge(), lessMinActiveReplica); + Assert.assertEquals(monitor.getErrorPartitionGauge(), 0); + Assert.assertEquals(monitor.getExternalViewPartitionGauge(), _partitions); + Assert.assertEquals(monitor.getPartitionGauge(), _partitions); + Assert.assertEquals(monitor.getMissingMinActiveReplicaPartitionGauge(), lessMinActiveReplica); + Assert.assertEquals(monitor.getMissingReplicaPartitionGauge(), lessMinActiveReplica); + Assert.assertEquals(monitor.getMissingTopStatePartitionGauge(), 0); + + int lessReplica = 4; + externalView = new ExternalView(deepCopyZNRecord(idealStateRecord)); + start = r.nextInt(_partitions - lessReplica - 1); + for (int i = start; i < start + lessReplica; i++) { + String partition = _dbName + "_" + i; + Map map = externalView.getStateMap(partition); + int flag = 0; + Iterator it = map.keySet().iterator(); + while (it.hasNext()) { + String key = it.next(); + if (map.get(key).equalsIgnoreCase("SLAVE")) { + if (flag++ % 2 == 0) { + map.put(key, "OFFLINE"); + } else { + it.remove(); + } + break; } - break; } + externalView.setStateMap(partition, map); } - externalView.setStateMap(partition, map); - } - monitor.updateResourceState(externalView, idealState, stateModelDef); - - Assert.assertEquals(monitor.getDifferenceWithIdealStateGauge(), lessReplica); - Assert.assertEquals(monitor.getErrorPartitionGauge(), 0); - Assert.assertEquals(monitor.getExternalViewPartitionGauge(), _partitions); - Assert.assertEquals(monitor.getPartitionGauge(), _partitions); - Assert.assertEquals(monitor.getMissingMinActiveReplicaPartitionGauge(), 0); - Assert.assertEquals(monitor.getMissingReplicaPartitionGauge(), lessReplica); - Assert.assertEquals(monitor.getMissingTopStatePartitionGauge(), 0); - - int missTopState = 7; - externalView = new ExternalView(deepCopyZNRecord(idealStateRecord)); - start = r.nextInt(_partitions - missTopState - 1); - for (int i = start; i < start + missTopState; i++) { - String partition = _dbName + "_" + i; - Map map = externalView.getStateMap(partition); - int flag = 0; - for (String key : map.keySet()) { - if (map.get(key).equalsIgnoreCase("MASTER")) { - if (flag++ % 2 == 0) { - map.put(key, "OFFLINE"); - } else { - map.remove(key); + monitor.updateResourceState(externalView, idealState, stateModelDef); + + Assert.assertEquals(monitor.getDifferenceWithIdealStateGauge(), lessReplica); + Assert.assertEquals(monitor.getErrorPartitionGauge(), 0); + Assert.assertEquals(monitor.getExternalViewPartitionGauge(), _partitions); + Assert.assertEquals(monitor.getPartitionGauge(), _partitions); + Assert.assertEquals(monitor.getMissingMinActiveReplicaPartitionGauge(), 0); + Assert.assertEquals(monitor.getMissingReplicaPartitionGauge(), lessReplica); + Assert.assertEquals(monitor.getMissingTopStatePartitionGauge(), 0); + + int missTopState = 7; + externalView = new ExternalView(deepCopyZNRecord(idealStateRecord)); + start = r.nextInt(_partitions - missTopState - 1); + for (int i = start; i < start + missTopState; i++) { + String partition = _dbName + "_" + i; + Map map = externalView.getStateMap(partition); + int flag = 0; + for (String key : map.keySet()) { + if (map.get(key).equalsIgnoreCase("MASTER")) { + if (flag++ % 2 == 0) { + map.put(key, "OFFLINE"); + } else { + map.remove(key); + } + break; } - break; } + externalView.setStateMap(partition, map); } - externalView.setStateMap(partition, map); + + monitor.updateResourceState(externalView, idealState, stateModelDef); + + Assert.assertEquals(monitor.getDifferenceWithIdealStateGauge(), missTopState); + Assert.assertEquals(monitor.getErrorPartitionGauge(), 0); + Assert.assertEquals(monitor.getExternalViewPartitionGauge(), _partitions); + Assert.assertEquals(monitor.getPartitionGauge(), _partitions); + Assert.assertEquals(monitor.getMissingMinActiveReplicaPartitionGauge(), 0); + Assert.assertEquals(monitor.getMissingReplicaPartitionGauge(), missTopState); + Assert.assertEquals(monitor.getMissingTopStatePartitionGauge(), missTopState); + + Assert.assertEquals(monitor.getNumPendingStateTransitionGauge(), 0); + // test pending state transition message report and read + int messageCount = new Random().nextInt(_partitions) + 1; + monitor.updatePendingStateTransitionMessages(messageCount); + Assert.assertEquals(monitor.getNumPendingStateTransitionGauge(), messageCount); + + Assert.assertEquals(monitor.getRebalanceState(), + ResourceMonitor.RebalanceStatus.UNKNOWN.name()); + monitor.setRebalanceState(ResourceMonitor.RebalanceStatus.NORMAL); + Assert + .assertEquals(monitor.getRebalanceState(), ResourceMonitor.RebalanceStatus.NORMAL.name()); + monitor.setRebalanceState(ResourceMonitor.RebalanceStatus.BEST_POSSIBLE_STATE_CAL_FAILED); + Assert.assertEquals(monitor.getRebalanceState(), + ResourceMonitor.RebalanceStatus.BEST_POSSIBLE_STATE_CAL_FAILED.name()); + monitor.setRebalanceState(ResourceMonitor.RebalanceStatus.INTERMEDIATE_STATE_CAL_FAILED); + Assert.assertEquals(monitor.getRebalanceState(), + ResourceMonitor.RebalanceStatus.INTERMEDIATE_STATE_CAL_FAILED.name()); + } finally { + // Has to unregister this monitor to clean up. Otherwise, later tests may be affected and fail. + monitor.unregister(); } + } + + @Test + public void testUpdatePartitionWeightStats() throws JMException, IOException { + final MBeanServerConnection mBeanServer = ManagementFactory.getPlatformMBeanServer(); + final String clusterName = TestHelper.getTestMethodName(); + final String resource = "testDB"; + final ObjectName resourceObjectName = new ObjectName("testDomain:key=value"); + final ResourceMonitor monitor = + new ResourceMonitor(clusterName, resource, resourceObjectName); + monitor.register(); + + try { + Map> partitionWeightMap = + ImmutableMap.of(resource, ImmutableMap.of("capacity1", 20, "capacity2", 40)); + + // Update Metrics + partitionWeightMap.values().forEach(monitor::updatePartitionWeightStats); + + verifyPartitionWeightMetrics(mBeanServer, resourceObjectName, partitionWeightMap); + + // Change capacity keys: "capacity2" -> "capacity3" + partitionWeightMap = + ImmutableMap.of(resource, ImmutableMap.of("capacity1", 20, "capacity3", 60)); + + // Update metrics. + partitionWeightMap.values().forEach(monitor::updatePartitionWeightStats); - monitor.updateResourceState(externalView, idealState, stateModelDef); - - Assert.assertEquals(monitor.getDifferenceWithIdealStateGauge(), missTopState); - Assert.assertEquals(monitor.getErrorPartitionGauge(), 0); - Assert.assertEquals(monitor.getExternalViewPartitionGauge(), _partitions); - Assert.assertEquals(monitor.getPartitionGauge(), _partitions); - Assert.assertEquals(monitor.getMissingMinActiveReplicaPartitionGauge(), 0); - Assert.assertEquals(monitor.getMissingReplicaPartitionGauge(), missTopState); - Assert.assertEquals(monitor.getMissingTopStatePartitionGauge(), missTopState); - - Assert.assertEquals(monitor.getNumPendingStateTransitionGauge(), 0); - // test pending state transition message report and read - int messageCount = new Random().nextInt(_partitions) + 1; - monitor.updatePendingStateTransitionMessages(messageCount); - Assert.assertEquals(monitor.getNumPendingStateTransitionGauge(), messageCount); - - Assert - .assertEquals(monitor.getRebalanceState(), ResourceMonitor.RebalanceStatus.UNKNOWN.name()); - monitor.setRebalanceState(ResourceMonitor.RebalanceStatus.NORMAL); - Assert.assertEquals(monitor.getRebalanceState(), ResourceMonitor.RebalanceStatus.NORMAL.name()); - monitor.setRebalanceState(ResourceMonitor.RebalanceStatus.BEST_POSSIBLE_STATE_CAL_FAILED); - Assert.assertEquals(monitor.getRebalanceState(), - ResourceMonitor.RebalanceStatus.BEST_POSSIBLE_STATE_CAL_FAILED.name()); - monitor.setRebalanceState(ResourceMonitor.RebalanceStatus.INTERMEDIATE_STATE_CAL_FAILED); - Assert.assertEquals(monitor.getRebalanceState(), - ResourceMonitor.RebalanceStatus.INTERMEDIATE_STATE_CAL_FAILED.name()); + // Verify results. + verifyPartitionWeightMetrics(mBeanServer, resourceObjectName, partitionWeightMap); + + // "capacity2" metric should not exist in MBean server. + String removedAttribute = "capacity2Gauge"; + try { + mBeanServer.getAttribute(resourceObjectName, removedAttribute); + Assert.fail("AttributeNotFoundException should be thrown because attribute [capacity2Gauge]" + + " is removed."); + } catch (AttributeNotFoundException expected) { + } + } finally { + // Reset monitor. + monitor.unregister(); + Assert.assertFalse(mBeanServer.isRegistered(resourceObjectName), + "Failed to unregister resource monitor."); + } } /** @@ -240,4 +301,28 @@ public static ZNRecord deepCopyZNRecord(ZNRecord record) { return copy; } + + private void verifyPartitionWeightMetrics(MBeanServerConnection mBeanServer, + ObjectName objectName, Map> expectedPartitionWeightMap) + throws IOException, AttributeNotFoundException, MBeanException, ReflectionException, + InstanceNotFoundException { + final String gaugeMetricSuffix = "Gauge"; + for (Map.Entry> entry : expectedPartitionWeightMap.entrySet()) { + // Resource monitor for this resource is already registered. + Assert.assertTrue(mBeanServer.isRegistered(objectName)); + + for (Map.Entry capacityEntry : entry.getValue().entrySet()) { + String attributeName = capacityEntry.getKey() + gaugeMetricSuffix; + try { + // Wait until the attribute is already registered to mbean server. + Assert.assertTrue(TestHelper.verify( + () -> !mBeanServer.getAttributes(objectName, new String[]{attributeName}).isEmpty(), + 2000)); + } catch (Exception ignored) { + } + Assert.assertEquals((long) mBeanServer.getAttribute(objectName, attributeName), + (long) capacityEntry.getValue()); + } + } + } }