From 432b06cd34bdfd9282387eb7b0d606be30b8d947 Mon Sep 17 00:00:00 2001 From: Jiajun Wang Date: Sat, 20 Jul 2019 00:00:11 -0700 Subject: [PATCH 1/4] Adding the configuration items of the WAGED rebalancer. Including: Instance Capacity Keys, Rebalance Preferences, Instance Capacity Details, Partition Capacity (the weight) Details. Also adding test to cover the new configuration items. --- .../org/apache/helix/model/ClusterConfig.java | 129 +++++++++++++--- .../apache/helix/model/InstanceConfig.java | 62 ++++++-- .../apache/helix/model/ResourceConfig.java | 72 ++++++++- .../apache/helix/model/TestClusterConfig.java | 130 ++++++++++++++++ .../helix/model/TestInstanceConfig.java | 66 +++++++- .../helix/model/TestResourceConfig.java | 144 ++++++++++++++++++ 6 files changed, 563 insertions(+), 40 deletions(-) create mode 100644 helix-core/src/test/java/org/apache/helix/model/TestClusterConfig.java create mode 100644 helix-core/src/test/java/org/apache/helix/model/TestResourceConfig.java diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java index 5efecc98a4..67411ca0f4 100644 --- a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java +++ b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java @@ -19,12 +19,8 @@ * under the License. */ +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; import org.apache.helix.HelixException; import org.apache.helix.HelixProperty; import org.apache.helix.ZNRecord; @@ -32,6 +28,12 @@ import org.apache.helix.api.config.StateTransitionThrottleConfig; import org.apache.helix.api.config.StateTransitionTimeoutConfig; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + /** * Cluster configurations */ @@ -80,7 +82,19 @@ public enum ClusterConfigProperty { DISABLED_INSTANCES, // Specifies job types and used for quota allocation - QUOTA_TYPES + QUOTA_TYPES, + + // The required instance capacity keys for resource partition assignment calculation. + INSTANCE_CAPACITY_KEYS, + // The preference of the rebalance result. + // EVENNESS - Evenness of the resource utilization, partition, and top state distribution. + // LESS_MOVEMENT - the tendency of keeping the current assignment instead of moving the partition for optimal assignment. + REBALANCE_PREFERENCE + } + + public enum GlobalRebalancePreferenceKey { + EVENNESS, + LESS_MOVEMENT } private final static int DEFAULT_MAX_CONCURRENT_TASK_PER_INSTANCE = 40; @@ -95,6 +109,15 @@ public enum ClusterConfigProperty { public final static String TASK_QUOTA_RATIO_NOT_SET = "-1"; + // Default preference for all the aspects should be the same to ensure balanced setup. + public final static Map + DEFAULT_GLOBAL_REBALANCE_PREFERENCE = + ImmutableMap.builder() + .put(GlobalRebalancePreferenceKey.EVENNESS, 1) + .put(GlobalRebalancePreferenceKey.LESS_MOVEMENT, 1).build(); + private final static int MAX_REBALANCE_PREFERENCE = 10; + private final static int MIN_REBALANCE_PREFERENCE = 0; + /** * Instantiate for a specific cluster * @param cluster the cluster identifier @@ -113,21 +136,21 @@ public ClusterConfig(ZNRecord record) { /** * Set task quota type with the ratio of this quota. - * @param quotaType String + * @param quotaType String * @param quotaRatio int */ public void setTaskQuotaRatio(String quotaType, int quotaRatio) { if (_record.getMapField(ClusterConfigProperty.QUOTA_TYPES.name()) == null) { _record.setMapField(ClusterConfigProperty.QUOTA_TYPES.name(), new HashMap()); } - _record.getMapField(ClusterConfigProperty.QUOTA_TYPES.name()).put(quotaType, - Integer.toString(quotaRatio)); + _record.getMapField(ClusterConfigProperty.QUOTA_TYPES.name()) + .put(quotaType, Integer.toString(quotaRatio)); } /** * Set task quota type with the ratio of this quota. Quota ratio must be a String that is * parse-able into an int. - * @param quotaType String + * @param quotaType String * @param quotaRatio String */ public void setTaskQuotaRatio(String quotaType, String quotaRatio) { @@ -210,8 +233,8 @@ public void setPersistBestPossibleAssignment(Boolean enable) { * @return */ public Boolean isPersistIntermediateAssignment() { - return _record.getBooleanField(ClusterConfigProperty.PERSIST_INTERMEDIATE_ASSIGNMENT.toString(), - false); + return _record + .getBooleanField(ClusterConfigProperty.PERSIST_INTERMEDIATE_ASSIGNMENT.toString(), false); } /** @@ -233,8 +256,8 @@ public void setPersistIntermediateAssignment(Boolean enable) { } public Boolean isPipelineTriggersDisabled() { - return _record.getBooleanField(ClusterConfigProperty.HELIX_DISABLE_PIPELINE_TRIGGERS.toString(), - false); + return _record + .getBooleanField(ClusterConfigProperty.HELIX_DISABLE_PIPELINE_TRIGGERS.toString(), false); } /** @@ -403,8 +426,8 @@ public void setNumOfflineInstancesForAutoExit(int maintenanceRecoveryThreshold) * @return */ public int getNumOfflineInstancesForAutoExit() { - return _record.getIntField(ClusterConfigProperty.NUM_OFFLINE_INSTANCES_FOR_AUTO_EXIT.name(), - -1); + return _record + .getIntField(ClusterConfigProperty.NUM_OFFLINE_INSTANCES_FOR_AUTO_EXIT.name(), -1); } /** @@ -444,9 +467,7 @@ public boolean equals(Object obj) { if (obj instanceof ClusterConfig) { ClusterConfig that = (ClusterConfig) obj; - if (this.getId().equals(that.getId())) { - return true; - } + return this.getId().equals(that.getId()); } return false; } @@ -490,8 +511,8 @@ public void setStateTransitionThrottleConfigs( } if (!configStrs.isEmpty()) { - _record.setListField(ClusterConfigProperty.STATE_TRANSITION_THROTTLE_CONFIGS.name(), - configStrs); + _record + .setListField(ClusterConfigProperty.STATE_TRANSITION_THROTTLE_CONFIGS.name(), configStrs); } } @@ -579,7 +600,7 @@ public void setMaxConcurrentTaskPerInstance(int maxConcurrentTaskPerInstance) { public int getErrorPartitionThresholdForLoadBalance() { return _record.getIntField( ClusterConfigProperty.ERROR_PARTITION_THRESHOLD_FOR_LOAD_BALANCE.name(), - DEFAULT_ERROR_PARTITION_THRESHOLD_FOR_LOAD_BALANCE); + DEFAULT_ERROR_PARTITION_THRESHOLD_FOR_LOAD_BALANCE); } /** @@ -657,6 +678,70 @@ public void enableP2PMessage(boolean enabled) { _record.setBooleanField(HelixConfigProperty.P2P_MESSAGE_ENABLED.name(), enabled); } + /** + * Set the required Instance Capacity Keys. + * @param capacityKeys + */ + public void setInstanceCapacityKeys(List capacityKeys) { + if (capacityKeys == null || capacityKeys.isEmpty()) { + throw new IllegalArgumentException("The input instance capacity key list is empty."); + } + _record.setListField(ClusterConfigProperty.INSTANCE_CAPACITY_KEYS.name(), capacityKeys); + } + + /** + * @return The required Instance Capacity Keys. If not configured, return an empty list. + */ + public List getInstanceCapacityKeys() { + List capacityKeys = _record.getListField(ClusterConfigProperty.INSTANCE_CAPACITY_KEYS.name()); + if (capacityKeys == null) { + return Collections.emptyList(); + } + return capacityKeys; + } + + /** + * Set the global rebalancer's assignment preference. + * @param preference A map of the GlobalRebalancePreferenceKey and the corresponding weight. + * The ratio of the configured weights will determine the rebalancer's behavior. + */ + public void setGlobalRebalancePreference(Map preference) { + Map preferenceMap = new HashMap<>(); + + preference.entrySet().stream().forEach(entry -> { + if (entry.getValue() > MAX_REBALANCE_PREFERENCE + || entry.getValue() < MIN_REBALANCE_PREFERENCE) { + throw new IllegalArgumentException(String + .format("Invalid global rebalance preference configuration. Key %s, Value %d.", + entry.getKey().name(), entry.getValue())); + } + preferenceMap.put(entry.getKey().name(), Integer.toString(entry.getValue())); + }); + + _record.setMapField(ClusterConfigProperty.REBALANCE_PREFERENCE.name(), preferenceMap); + } + + /** + * Get the global rebalancer's assignment preference. + */ + public Map getGlobalRebalancePreference() { + Map preferenceStrMap = + _record.getMapField(ClusterConfigProperty.REBALANCE_PREFERENCE.name()); + if (preferenceStrMap != null && !preferenceStrMap.isEmpty()) { + Map preference = new HashMap<>(); + for (GlobalRebalancePreferenceKey key : GlobalRebalancePreferenceKey.values()) { + if (!preferenceStrMap.containsKey(key.name())) { + // If any key is not configured with a value, return the default config. + return DEFAULT_GLOBAL_REBALANCE_PREFERENCE; + } + preference.put(key, Integer.parseInt(preferenceStrMap.get(key.name()))); + } + return preference; + } + // If configuration is not complete, return the default one. + return DEFAULT_GLOBAL_REBALANCE_PREFERENCE; + } + /** * Get IdealState rules defined in the cluster config. * @return diff --git a/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java b/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java index f65a1bd42e..42092c88e5 100644 --- a/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java +++ b/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java @@ -19,6 +19,14 @@ * under the License. */ +import com.google.common.base.Splitter; +import org.apache.helix.HelixException; +import org.apache.helix.HelixProperty; +import org.apache.helix.ZNRecord; +import org.apache.helix.util.HelixUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -28,15 +36,6 @@ import java.util.Map; import java.util.Set; -import org.apache.helix.HelixException; -import org.apache.helix.HelixProperty; -import org.apache.helix.ZNRecord; -import org.apache.helix.util.HelixUtil; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.base.Splitter; - /** * Instance configurations */ @@ -55,7 +54,8 @@ public enum InstanceConfigProperty { INSTANCE_WEIGHT, DOMAIN, DELAY_REBALANCE_ENABLED, - MAX_CONCURRENT_TASK + MAX_CONCURRENT_TASK, + INSTANCE_CAPACITY_MAP } public static final int WEIGHT_NOT_SET = -1; @@ -505,6 +505,48 @@ public void setMaxConcurrentTask(int maxConcurrentTask) { _record.setIntField(InstanceConfigProperty.MAX_CONCURRENT_TASK.name(), maxConcurrentTask); } + /** + * Get the instance capacity information from the map fields + * + * @return data map if it exists, or empty map + */ + public Map getInstanceCapacityMap() { + Map capacityData = + _record.getMapField(InstanceConfigProperty.INSTANCE_CAPACITY_MAP.name()); + Map capacityDataMap = new HashMap<>(); + + if (capacityData != null) { + capacityData.entrySet().stream().forEach( + entry -> capacityDataMap.put(entry.getKey(), Integer.parseInt(entry.getValue()))); + } + return capacityDataMap; + } + + /** + * Set the instance capacity information with an Integer mapping + * @param capacityDataMap - map of instance capacity data + * @throws IllegalArgumentException - when any of the data value is a negative number or when the map is empty + */ + public void setInstanceCapacityMap(Map capacityDataMap) + throws IllegalArgumentException { + if (capacityDataMap == null || capacityDataMap.size() == 0) { + throw new IllegalArgumentException("Capacity Data is empty"); + } + + Map capacityData = new HashMap<>(); + + capacityDataMap.entrySet().stream().forEach(entry -> { + if (entry.getValue() < 0) { + throw new IllegalArgumentException(String + .format("Capacity Data contains a negative value: %s = %d", entry.getKey(), + entry.getValue())); + } + capacityData.put(entry.getKey(), Integer.toString(entry.getValue())); + }); + + _record.setMapField(InstanceConfigProperty.INSTANCE_CAPACITY_MAP.name(), capacityData); + } + @Override public boolean equals(Object obj) { if (obj instanceof InstanceConfig) { diff --git a/helix-core/src/main/java/org/apache/helix/model/ResourceConfig.java b/helix-core/src/main/java/org/apache/helix/model/ResourceConfig.java index 274640c5b3..1b57448927 100644 --- a/helix-core/src/main/java/org/apache/helix/model/ResourceConfig.java +++ b/helix-core/src/main/java/org/apache/helix/model/ResourceConfig.java @@ -19,19 +19,23 @@ * under the License. */ -import java.util.Collections; -import java.util.List; -import java.util.Map; - -import java.util.TreeMap; import org.apache.helix.HelixProperty; import org.apache.helix.ZNRecord; import org.apache.helix.api.config.HelixConfigProperty; import org.apache.helix.api.config.RebalanceConfig; import org.apache.helix.api.config.StateTransitionTimeoutConfig; +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.type.TypeReference; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + /** * Resource configurations */ @@ -53,7 +57,8 @@ public enum ResourceConfigProperty { RESOURCE_TYPE, GROUP_ROUTING_ENABLED, EXTERNAL_VIEW_DISABLED, - DELAY_REBALANCE_ENABLED + DELAY_REBALANCE_ENABLED, + PARTITION_CAPACITY_MAP } public enum ResourceConfigConstants { @@ -61,6 +66,10 @@ public enum ResourceConfigConstants { } private static final Logger _logger = LoggerFactory.getLogger(ResourceConfig.class.getName()); + private static final ObjectMapper _objectMapper = new ObjectMapper(); + + public static final String DEFAULT_PARTITION_KEY = "DEFAULT"; + /** * Instantiate for a specific instance * @@ -349,6 +358,57 @@ public void setPreferenceLists(Map> instanceLists) { _record.setListFields(instanceLists); } + /** + * Get the partition capacity information from a JSON among the map fields. + * > + * + * @return data map if it exists, or empty map + * @throws IOException - when JSON conversion fails + */ + public Map> getPartitionCapacityMap() throws IOException { + Map capacityData = + _record.getMapField(ResourceConfigProperty.PARTITION_CAPACITY_MAP.name()); + Map> partitionCapacityMap = new HashMap<>(); + if (capacityData != null) { + for (String key : capacityData.keySet()) { + Map capacities = _objectMapper + .readValue(capacityData.get(key), new TypeReference>() {}); + partitionCapacityMap.put(key, capacities); + } + } + return partitionCapacityMap; + } + + /** + * Set the partition capacity information with a map > + * + * @param partitionCapacityMap - map of partition capacity data + * @throws IllegalArgumentException - when any of the data value is a negative number or map is empty + * @throws IOException - when JSON parsing fails + */ + public void setPartitionCapacityMap(Map> partitionCapacityMap) + throws IllegalArgumentException, IOException { + if (partitionCapacityMap == null || partitionCapacityMap.isEmpty()) { + throw new IllegalArgumentException("Capacity Map is empty"); + } + + // Verify the input is valid + Map newCapacityRecord = new HashMap<>(); + for (String key : partitionCapacityMap.keySet()) { + Map capacities = partitionCapacityMap.get(key); + if (capacities.isEmpty()) { + throw new IllegalArgumentException("Capacity Data is empty"); + } + if (capacities.entrySet().stream().anyMatch(entry -> entry.getValue() < 0)) { + throw new IllegalArgumentException( + String.format("Capacity Data contains a negative value:%s", capacities.toString())); + } + newCapacityRecord.put(key, _objectMapper.writeValueAsString(capacities)); + } + + _record.setMapField(ResourceConfigProperty.PARTITION_CAPACITY_MAP.name(), newCapacityRecord); + } + /** * Put a set of simple configs. * diff --git a/helix-core/src/test/java/org/apache/helix/model/TestClusterConfig.java b/helix-core/src/test/java/org/apache/helix/model/TestClusterConfig.java new file mode 100644 index 0000000000..209b196000 --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/model/TestClusterConfig.java @@ -0,0 +1,130 @@ +package org.apache.helix.model; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import com.google.common.collect.ImmutableList; +import org.testng.Assert; +import org.testng.annotations.Test; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.helix.model.ClusterConfig.GlobalRebalancePreferenceKey.EVENNESS; +import static org.apache.helix.model.ClusterConfig.GlobalRebalancePreferenceKey.LESS_MOVEMENT; + +public class TestClusterConfig { + + @Test + public void testGetCapacityKeys() { + List keys = ImmutableList.of("CPU", "MEMORY", "Random"); + + ClusterConfig testConfig = new ClusterConfig("testId"); + testConfig.getRecord() + .setListField(ClusterConfig.ClusterConfigProperty.INSTANCE_CAPACITY_KEYS.name(), keys); + + Assert.assertEquals(testConfig.getInstanceCapacityKeys(), keys); + } + + @Test + public void testGetCapacityKeysEmpty() { + ClusterConfig testConfig = new ClusterConfig("testId"); + Assert.assertEquals(testConfig.getInstanceCapacityKeys(), Collections.emptyList()); + } + + @Test + public void testSetCapacityKeys() { + List keys = ImmutableList.of("CPU", "MEMORY", "Random"); + + ClusterConfig testConfig = new ClusterConfig("testId"); + testConfig.setInstanceCapacityKeys(keys); + + Assert.assertEquals(keys, testConfig.getRecord() + .getListField(ClusterConfig.ClusterConfigProperty.INSTANCE_CAPACITY_KEYS.name())); + } + + @Test(expectedExceptions = IllegalArgumentException.class) + public void testSetCapacityKeysEmptyList() { + ClusterConfig testConfig = new ClusterConfig("testId"); + testConfig.setInstanceCapacityKeys(Collections.emptyList()); + } + + @Test + public void testGetRebalancePreference() { + Map preference = new HashMap<>(); + preference.put(EVENNESS, 5); + preference.put(LESS_MOVEMENT, 3); + + Map mapFieldData = new HashMap<>(); + for (ClusterConfig.GlobalRebalancePreferenceKey key : preference.keySet()) { + mapFieldData.put(key.name(), String.valueOf(preference.get(key))); + } + + ClusterConfig testConfig = new ClusterConfig("testId"); + testConfig.getRecord() + .setMapField(ClusterConfig.ClusterConfigProperty.REBALANCE_PREFERENCE.name(), mapFieldData); + + Assert.assertEquals(testConfig.getGlobalRebalancePreference(), preference); + } + + @Test + public void testGetRebalancePreferenceDefault() { + ClusterConfig testConfig = new ClusterConfig("testId"); + Assert.assertEquals(testConfig.getGlobalRebalancePreference(), + ClusterConfig.DEFAULT_GLOBAL_REBALANCE_PREFERENCE); + + Map preference = new HashMap<>(); + preference.put(EVENNESS, 5); + testConfig.setGlobalRebalancePreference(preference); + + Assert.assertEquals(testConfig.getGlobalRebalancePreference(), + ClusterConfig.DEFAULT_GLOBAL_REBALANCE_PREFERENCE); + } + + @Test + public void testSetRebalancePreference() { + Map preference = new HashMap<>(); + preference.put(EVENNESS, 5); + preference.put(LESS_MOVEMENT, 3); + + Map mapFieldData = new HashMap<>(); + for (ClusterConfig.GlobalRebalancePreferenceKey key : preference.keySet()) { + mapFieldData.put(key.name(), String.valueOf(preference.get(key))); + } + + ClusterConfig testConfig = new ClusterConfig("testId"); + testConfig.setGlobalRebalancePreference(preference); + + Assert.assertEquals(testConfig.getRecord() + .getMapField(ClusterConfig.ClusterConfigProperty.REBALANCE_PREFERENCE.name()), + mapFieldData); + } + + @Test(expectedExceptions = IllegalArgumentException.class) + public void testSetRebalancePreferenceInvalidNumber() { + Map preference = new HashMap<>(); + preference.put(EVENNESS, -1); + preference.put(LESS_MOVEMENT, 3); + + ClusterConfig testConfig = new ClusterConfig("testId"); + testConfig.setGlobalRebalancePreference(preference); + } +} diff --git a/helix-core/src/test/java/org/apache/helix/model/TestInstanceConfig.java b/helix-core/src/test/java/org/apache/helix/model/TestInstanceConfig.java index 38b1c92369..f0da05f6d4 100644 --- a/helix-core/src/test/java/org/apache/helix/model/TestInstanceConfig.java +++ b/helix-core/src/test/java/org/apache/helix/model/TestInstanceConfig.java @@ -19,12 +19,14 @@ * under the License. */ -import java.util.Map; - +import com.google.common.collect.ImmutableMap; import org.apache.helix.ZNRecord; import org.testng.Assert; import org.testng.annotations.Test; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; /** * Created with IntelliJ IDEA. @@ -58,4 +60,64 @@ public void testGetParsedDomain_emptyDomain() { Map parsedDomain = instanceConfig.getDomainAsMap(); Assert.assertTrue(parsedDomain.isEmpty()); } + + @Test + public void testGetInstanceCapacityMap() { + Map capacityDataMap = ImmutableMap.of("item1", 1, + "item2", 2, + "item3", 3); + + Map capacityDataMapString = ImmutableMap.of("item1", "1", + "item2", "2", + "item3", "3"); + + ZNRecord rec = new ZNRecord("testId"); + rec.setMapField(InstanceConfig.InstanceConfigProperty.INSTANCE_CAPACITY_MAP.name(), capacityDataMapString); + InstanceConfig testConfig = new InstanceConfig(rec); + + Assert.assertTrue(testConfig.getInstanceCapacityMap().equals(capacityDataMap)); + } + + @Test + public void testGetInstanceCapacityMapEmpty() { + InstanceConfig testConfig = new InstanceConfig("testId"); + + Assert.assertTrue(testConfig.getInstanceCapacityMap().equals(Collections.emptyMap())); + } + + @Test + public void testSetInstanceCapacityMap() { + Map capacityDataMap = ImmutableMap.of("item1", 1, + "item2", 2, + "item3", 3); + + Map capacityDataMapString = ImmutableMap.of("item1", "1", + "item2", "2", + "item3", "3"); + + InstanceConfig testConfig = new InstanceConfig("testConfig"); + testConfig.setInstanceCapacityMap(capacityDataMap); + + Assert.assertEquals(testConfig.getRecord().getMapField(InstanceConfig.InstanceConfigProperty. + INSTANCE_CAPACITY_MAP.name()), capacityDataMapString); + } + + @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Capacity Data is empty") + public void testSetInstanceCapacityMapEmpty() { + Map capacityDataMap = new HashMap<>(); + + InstanceConfig testConfig = new InstanceConfig("testConfig"); + testConfig.setInstanceCapacityMap(capacityDataMap); + } + + @Test(expectedExceptions = IllegalArgumentException.class, + expectedExceptionsMessageRegExp = "Capacity Data contains a negative value: item3 = -3") + public void testSetInstanceCapacityMapInvalid() { + Map capacityDataMap = ImmutableMap.of("item1", 1, + "item2", 2, + "item3", -3); + + InstanceConfig testConfig = new InstanceConfig("testConfig"); + testConfig.setInstanceCapacityMap(capacityDataMap); + } } diff --git a/helix-core/src/test/java/org/apache/helix/model/TestResourceConfig.java b/helix-core/src/test/java/org/apache/helix/model/TestResourceConfig.java new file mode 100644 index 0000000000..8f8727686f --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/model/TestResourceConfig.java @@ -0,0 +1,144 @@ +package org.apache.helix.model; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import com.google.common.collect.ImmutableMap; +import org.apache.helix.ZNRecord; +import org.codehaus.jackson.map.ObjectMapper; +import org.testng.Assert; +import org.testng.annotations.Test; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +public class TestResourceConfig { + private static final ObjectMapper _objectMapper = new ObjectMapper(); + + @Test + public void testGetPartitionCapacityMap() throws IOException { + Map capacityDataMap = ImmutableMap.of("item1", 1, + "item2", 2, + "item3", 3); + + ZNRecord rec = new ZNRecord("testId"); + rec.setMapField(ResourceConfig.ResourceConfigProperty.PARTITION_CAPACITY_MAP.name(), Collections + .singletonMap(ResourceConfig.DEFAULT_PARTITION_KEY, + _objectMapper.writeValueAsString(capacityDataMap))); + ResourceConfig testConfig = new ResourceConfig(rec); + + Assert.assertTrue(testConfig.getPartitionCapacityMap().get(ResourceConfig.DEFAULT_PARTITION_KEY) + .equals(capacityDataMap)); + } + + @Test + public void testGetPartitionCapacityMapEmpty() throws IOException { + ResourceConfig testConfig = new ResourceConfig("testId"); + + Assert.assertTrue(testConfig.getPartitionCapacityMap().equals(Collections.emptyMap())); + } + + @Test(expectedExceptions = IOException.class) + public void testGetPartitionCapacityMapInvalidJson() throws IOException { + ZNRecord rec = new ZNRecord("testId"); + rec.setMapField(ResourceConfig.ResourceConfigProperty.PARTITION_CAPACITY_MAP.name(), + Collections.singletonMap("test", "gibberish")); + ResourceConfig testConfig = new ResourceConfig(rec); + + testConfig.getPartitionCapacityMap(); + } + + @Test(dependsOnMethods = "testGetPartitionCapacityMap", expectedExceptions = IOException.class) + public void testGetPartitionCapacityMapInvalidJsonType() throws IOException { + Map capacityDataMap = ImmutableMap.of("item1", "1", + "item2", "2", + "item3", "three"); + + ZNRecord rec = new ZNRecord("testId"); + rec.setMapField(ResourceConfig.ResourceConfigProperty.PARTITION_CAPACITY_MAP.name(), Collections + .singletonMap(ResourceConfig.DEFAULT_PARTITION_KEY, + _objectMapper.writeValueAsString(capacityDataMap))); + ResourceConfig testConfig = new ResourceConfig(rec); + + testConfig.getPartitionCapacityMap(); + } + + @Test + public void testSetPartitionCapacityMap() throws IOException { + Map capacityDataMap = ImmutableMap.of("item1", 1, + "item2", 2, + "item3", 3); + + ResourceConfig testConfig = new ResourceConfig("testConfig"); + testConfig.setPartitionCapacityMap( + Collections.singletonMap(ResourceConfig.DEFAULT_PARTITION_KEY, capacityDataMap)); + + Assert.assertEquals(testConfig.getRecord().getMapField(ResourceConfig.ResourceConfigProperty. + PARTITION_CAPACITY_MAP.name()).get(ResourceConfig.DEFAULT_PARTITION_KEY), + _objectMapper.writeValueAsString(capacityDataMap)); + } + + @Test + public void testSetMultiplePartitionCapacityMap() throws IOException { + Map capacityDataMap = ImmutableMap.of("item1", 1, + "item2", 2, + "item3", 3); + + Map> totalCapacityMap = ImmutableMap.of("partition1", capacityDataMap, + "partition2", capacityDataMap, + "partition3", capacityDataMap); + + ResourceConfig testConfig = new ResourceConfig("testConfig"); + testConfig.setPartitionCapacityMap(totalCapacityMap); + + Assert.assertNull(testConfig.getRecord().getMapField(ResourceConfig.ResourceConfigProperty. + PARTITION_CAPACITY_MAP.name()).get(ResourceConfig.DEFAULT_PARTITION_KEY)); + Assert.assertEquals(testConfig.getRecord().getMapField(ResourceConfig.ResourceConfigProperty. + PARTITION_CAPACITY_MAP.name()).get("partition1"), + _objectMapper.writeValueAsString(capacityDataMap)); + Assert.assertEquals(testConfig.getRecord().getMapField(ResourceConfig.ResourceConfigProperty. + PARTITION_CAPACITY_MAP.name()).get("partition2"), + _objectMapper.writeValueAsString(capacityDataMap)); + Assert.assertEquals(testConfig.getRecord().getMapField(ResourceConfig.ResourceConfigProperty. + PARTITION_CAPACITY_MAP.name()).get("partition3"), + _objectMapper.writeValueAsString(capacityDataMap)); + } + + @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Capacity Data is empty") + public void testSetPartitionCapacityMapEmpty() throws IOException { + Map capacityDataMap = new HashMap<>(); + + ResourceConfig testConfig = new ResourceConfig("testConfig"); + testConfig.setPartitionCapacityMap( + Collections.singletonMap(ResourceConfig.DEFAULT_PARTITION_KEY, capacityDataMap)); + } + + @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Capacity Data contains a negative value:.+") + public void testSetPartitionCapacityMapInvalid() throws IOException { + Map capacityDataMap = ImmutableMap.of("item1", 1, + "item2", 2, + "item3", -3); + + ResourceConfig testConfig = new ResourceConfig("testConfig"); + testConfig.setPartitionCapacityMap( + Collections.singletonMap(ResourceConfig.DEFAULT_PARTITION_KEY, capacityDataMap)); + } +} From 18dbc696688f41d00758eb56c02920089afae79c Mon Sep 17 00:00:00 2001 From: Jiajun Wang Date: Thu, 25 Jul 2019 23:01:19 -0700 Subject: [PATCH 2/4] Add check for the DEFAULT partition capacity configuration in the ResourceConfig. The DEFAULT partition capacity item is required. --- .../apache/helix/model/ResourceConfig.java | 27 ++++++++++++------- .../helix/model/TestResourceConfig.java | 16 ++++++++--- 2 files changed, 30 insertions(+), 13 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/model/ResourceConfig.java b/helix-core/src/main/java/org/apache/helix/model/ResourceConfig.java index 1b57448927..fccca89d0b 100644 --- a/helix-core/src/main/java/org/apache/helix/model/ResourceConfig.java +++ b/helix-core/src/main/java/org/apache/helix/model/ResourceConfig.java @@ -360,27 +360,29 @@ public void setPreferenceLists(Map> instanceLists) { /** * Get the partition capacity information from a JSON among the map fields. - * > + * > * * @return data map if it exists, or empty map * @throws IOException - when JSON conversion fails */ public Map> getPartitionCapacityMap() throws IOException { - Map capacityData = + Map partitionCapacityData = _record.getMapField(ResourceConfigProperty.PARTITION_CAPACITY_MAP.name()); Map> partitionCapacityMap = new HashMap<>(); - if (capacityData != null) { - for (String key : capacityData.keySet()) { + if (partitionCapacityData != null) { + for (String partition : partitionCapacityData.keySet()) { Map capacities = _objectMapper - .readValue(capacityData.get(key), new TypeReference>() {}); - partitionCapacityMap.put(key, capacities); + .readValue(partitionCapacityData.get(partition), + new TypeReference>() { + }); + partitionCapacityMap.put(partition, capacities); } } return partitionCapacityMap; } /** - * Set the partition capacity information with a map > + * Set the partition capacity information with a map > * * @param partitionCapacityMap - map of partition capacity data * @throws IllegalArgumentException - when any of the data value is a negative number or map is empty @@ -391,11 +393,16 @@ public void setPartitionCapacityMap(Map> partitionC if (partitionCapacityMap == null || partitionCapacityMap.isEmpty()) { throw new IllegalArgumentException("Capacity Map is empty"); } + if (!partitionCapacityMap.containsKey(DEFAULT_PARTITION_KEY)) { + throw new IllegalArgumentException(String + .format("The default partition capacity with the default key %s is required.", + DEFAULT_PARTITION_KEY)); + } // Verify the input is valid Map newCapacityRecord = new HashMap<>(); - for (String key : partitionCapacityMap.keySet()) { - Map capacities = partitionCapacityMap.get(key); + for (String partition : partitionCapacityMap.keySet()) { + Map capacities = partitionCapacityMap.get(partition); if (capacities.isEmpty()) { throw new IllegalArgumentException("Capacity Data is empty"); } @@ -403,7 +410,7 @@ public void setPartitionCapacityMap(Map> partitionC throw new IllegalArgumentException( String.format("Capacity Data contains a negative value:%s", capacities.toString())); } - newCapacityRecord.put(key, _objectMapper.writeValueAsString(capacities)); + newCapacityRecord.put(partition, _objectMapper.writeValueAsString(capacities)); } _record.setMapField(ResourceConfigProperty.PARTITION_CAPACITY_MAP.name(), newCapacityRecord); diff --git a/helix-core/src/test/java/org/apache/helix/model/TestResourceConfig.java b/helix-core/src/test/java/org/apache/helix/model/TestResourceConfig.java index 8f8727686f..241787ff49 100644 --- a/helix-core/src/test/java/org/apache/helix/model/TestResourceConfig.java +++ b/helix-core/src/test/java/org/apache/helix/model/TestResourceConfig.java @@ -102,7 +102,8 @@ public void testSetMultiplePartitionCapacityMap() throws IOException { "item2", 2, "item3", 3); - Map> totalCapacityMap = ImmutableMap.of("partition1", capacityDataMap, + Map> totalCapacityMap = + ImmutableMap.of(ResourceConfig.DEFAULT_PARTITION_KEY, capacityDataMap, "partition2", capacityDataMap, "partition3", capacityDataMap); @@ -110,9 +111,9 @@ public void testSetMultiplePartitionCapacityMap() throws IOException { testConfig.setPartitionCapacityMap(totalCapacityMap); Assert.assertNull(testConfig.getRecord().getMapField(ResourceConfig.ResourceConfigProperty. - PARTITION_CAPACITY_MAP.name()).get(ResourceConfig.DEFAULT_PARTITION_KEY)); + PARTITION_CAPACITY_MAP.name()).get("partition1")); Assert.assertEquals(testConfig.getRecord().getMapField(ResourceConfig.ResourceConfigProperty. - PARTITION_CAPACITY_MAP.name()).get("partition1"), + PARTITION_CAPACITY_MAP.name()).get(ResourceConfig.DEFAULT_PARTITION_KEY), _objectMapper.writeValueAsString(capacityDataMap)); Assert.assertEquals(testConfig.getRecord().getMapField(ResourceConfig.ResourceConfigProperty. PARTITION_CAPACITY_MAP.name()).get("partition2"), @@ -131,6 +132,15 @@ public void testSetPartitionCapacityMapEmpty() throws IOException { Collections.singletonMap(ResourceConfig.DEFAULT_PARTITION_KEY, capacityDataMap)); } + @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "The default partition capacity with the default key DEFAULT is required.") + public void testSetPartitionCapacityMapWithoutDefault() throws IOException { + Map capacityDataMap = new HashMap<>(); + + ResourceConfig testConfig = new ResourceConfig("testConfig"); + testConfig.setPartitionCapacityMap( + Collections.singletonMap("Random", capacityDataMap)); + } + @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Capacity Data contains a negative value:.+") public void testSetPartitionCapacityMapInvalid() throws IOException { Map capacityDataMap = ImmutableMap.of("item1", 1, From 644a84e7a03975bb5de7a03ce89ca1ff27510cbe Mon Sep 17 00:00:00 2001 From: Jiajun Wang Date: Thu, 25 Jul 2019 23:34:21 -0700 Subject: [PATCH 3/4] Adding the capacity parameter to ResourceConfig Builder. Also refine the test case to cover new methods. --- .../apache/helix/model/InstanceConfig.java | 8 ++-- .../apache/helix/model/ResourceConfig.java | 44 ++++++++++++++++++- .../helix/model/TestResourceConfig.java | 32 ++++++++++++++ 3 files changed, 78 insertions(+), 6 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java b/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java index 42092c88e5..88fd1ddd40 100644 --- a/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java +++ b/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java @@ -35,6 +35,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; /** * Instance configurations @@ -513,13 +514,12 @@ public void setMaxConcurrentTask(int maxConcurrentTask) { public Map getInstanceCapacityMap() { Map capacityData = _record.getMapField(InstanceConfigProperty.INSTANCE_CAPACITY_MAP.name()); - Map capacityDataMap = new HashMap<>(); if (capacityData != null) { - capacityData.entrySet().stream().forEach( - entry -> capacityDataMap.put(entry.getKey(), Integer.parseInt(entry.getValue()))); + return capacityData.entrySet().stream().collect( + Collectors.toMap(entry -> entry.getKey(), entry -> Integer.parseInt(entry.getValue()))); } - return capacityDataMap; + return Collections.emptyMap(); } /** diff --git a/helix-core/src/main/java/org/apache/helix/model/ResourceConfig.java b/helix-core/src/main/java/org/apache/helix/model/ResourceConfig.java index fccca89d0b..c40b05ecbd 100644 --- a/helix-core/src/main/java/org/apache/helix/model/ResourceConfig.java +++ b/helix-core/src/main/java/org/apache/helix/model/ResourceConfig.java @@ -19,6 +19,7 @@ * under the License. */ +import org.apache.helix.HelixException; import org.apache.helix.HelixProperty; import org.apache.helix.ZNRecord; import org.apache.helix.api.config.HelixConfigProperty; @@ -399,10 +400,10 @@ public void setPartitionCapacityMap(Map> partitionC DEFAULT_PARTITION_KEY)); } - // Verify the input is valid Map newCapacityRecord = new HashMap<>(); for (String partition : partitionCapacityMap.keySet()) { Map capacities = partitionCapacityMap.get(partition); + // Verify the input is valid if (capacities.isEmpty()) { throw new IllegalArgumentException("Capacity Data is empty"); } @@ -543,6 +544,7 @@ public static class Builder { private StateTransitionTimeoutConfig _stateTransitionTimeoutConfig; private Map> _preferenceLists; private Map> _mapFields; + private Map> _partitionCapacityMap; public Builder(String resourceId) { _resourceId = resourceId; @@ -731,6 +733,23 @@ public Map> getPreferenceLists() { return _preferenceLists; } + public Builder setPartitionCapacity(Map defaultCapacity) { + setPartitionCapacity(DEFAULT_PARTITION_KEY, defaultCapacity); + return this; + } + + public Builder setPartitionCapacity(String partition, Map capacity) { + if (_partitionCapacityMap == null) { + _partitionCapacityMap = new HashMap<>(); + } + _partitionCapacityMap.put(partition, capacity); + return this; + } + + public Map getPartitionCapacity(String partition) { + return _partitionCapacityMap.get(partition); + } + public Builder setMapField(String key, Map fields) { if (_mapFields == null) { _mapFields = new TreeMap<>(); @@ -775,17 +794,38 @@ private void validate() { } } } + + if (_partitionCapacityMap != null) { + if (_partitionCapacityMap.keySet().stream() + .noneMatch(partition -> partition.equals(DEFAULT_PARTITION_KEY))) { + throw new IllegalArgumentException( + "Partition capacity is configured without the DEFAULT capacity!"); + } + if (_partitionCapacityMap.values().stream() + .anyMatch(capacity -> capacity.values().stream().anyMatch(value -> value < 0))) { + throw new IllegalArgumentException( + "Partition capacity is configured with negative capacity value!"); + } + } } public ResourceConfig build() { // TODO: Reenable the validation in the future when ResourceConfig is ready. // validate(); - return new ResourceConfig(_resourceId, _monitorDisabled, _numPartitions, _stateModelDefRef, + ResourceConfig config = new ResourceConfig(_resourceId, _monitorDisabled, _numPartitions, _stateModelDefRef, _stateModelFactoryName, _numReplica, _minActiveReplica, _maxPartitionsPerInstance, _instanceGroupTag, _helixEnabled, _resourceGroupName, _resourceType, _groupRoutingEnabled, _externalViewDisabled, _rebalanceConfig, _stateTransitionTimeoutConfig, _preferenceLists, _mapFields, _p2pMessageEnabled); + if (_partitionCapacityMap != null) { + try { + config.setPartitionCapacityMap(_partitionCapacityMap); + } catch (IOException e) { + throw new HelixException("Failed to generate the capacity configuration.", e); + } + } + return config; } } } diff --git a/helix-core/src/test/java/org/apache/helix/model/TestResourceConfig.java b/helix-core/src/test/java/org/apache/helix/model/TestResourceConfig.java index 241787ff49..8099486b05 100644 --- a/helix-core/src/test/java/org/apache/helix/model/TestResourceConfig.java +++ b/helix-core/src/test/java/org/apache/helix/model/TestResourceConfig.java @@ -151,4 +151,36 @@ public void testSetPartitionCapacityMapInvalid() throws IOException { testConfig.setPartitionCapacityMap( Collections.singletonMap(ResourceConfig.DEFAULT_PARTITION_KEY, capacityDataMap)); } + + @Test + public void testWithResourceBuilder() throws IOException { + Map capacityDataMap = ImmutableMap.of("item1", 1, + "item2", 2, + "item3", 3); + + ResourceConfig.Builder builder = new ResourceConfig.Builder("testConfig"); + builder.setPartitionCapacity(capacityDataMap); + builder.setPartitionCapacity("partition1", capacityDataMap); + + Assert.assertEquals( + builder.build().getPartitionCapacityMap().get(ResourceConfig.DEFAULT_PARTITION_KEY), + capacityDataMap); + Assert.assertEquals( + builder.build().getPartitionCapacityMap().get("partition1"), + capacityDataMap); + Assert.assertNull( + builder.build().getPartitionCapacityMap().get("Random")); + } + + @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "The default partition capacity with the default key DEFAULT is required.") + public void testWithResourceBuilderInvalidInput() { + Map capacityDataMap = ImmutableMap.of("item1", 1, + "item2", 2, + "item3", 3); + + ResourceConfig.Builder builder = new ResourceConfig.Builder("testConfig"); + builder.setPartitionCapacity("Random", capacityDataMap); + + builder.build(); + } } From 2839a74412894d93497a23dff49ebaffe4d70067 Mon Sep 17 00:00:00 2001 From: jiajunwang Date: Fri, 26 Jul 2019 11:33:35 -0700 Subject: [PATCH 4/4] Create a new private constructor for the resouce config builder. --- .../apache/helix/model/ResourceConfig.java | 40 +++++++++++++------ 1 file changed, 27 insertions(+), 13 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/model/ResourceConfig.java b/helix-core/src/main/java/org/apache/helix/model/ResourceConfig.java index c40b05ecbd..1ead08e128 100644 --- a/helix-core/src/main/java/org/apache/helix/model/ResourceConfig.java +++ b/helix-core/src/main/java/org/apache/helix/model/ResourceConfig.java @@ -19,7 +19,6 @@ * under the License. */ -import org.apache.helix.HelixException; import org.apache.helix.HelixProperty; import org.apache.helix.ZNRecord; import org.apache.helix.api.config.HelixConfigProperty; @@ -102,10 +101,24 @@ public ResourceConfig(String resourceId, Boolean monitorDisabled, int numPartiti String stateModelDefRef, String stateModelFactoryName, String numReplica, int minActiveReplica, int maxPartitionsPerInstance, String instanceGroupTag, Boolean helixEnabled, String resourceGroupName, String resourceType, - Boolean groupRoutingEnabled, Boolean externalViewDisabled, - RebalanceConfig rebalanceConfig, StateTransitionTimeoutConfig stateTransitionTimeoutConfig, + Boolean groupRoutingEnabled, Boolean externalViewDisabled, RebalanceConfig rebalanceConfig, + StateTransitionTimeoutConfig stateTransitionTimeoutConfig, Map> listFields, Map> mapFields, Boolean p2pMessageEnabled) { + this(resourceId, monitorDisabled, numPartitions, stateModelDefRef, stateModelFactoryName, + numReplica, minActiveReplica, maxPartitionsPerInstance, instanceGroupTag, helixEnabled, + resourceGroupName, resourceType, groupRoutingEnabled, externalViewDisabled, rebalanceConfig, + stateTransitionTimeoutConfig, listFields, mapFields, p2pMessageEnabled, null); + } + + private ResourceConfig(String resourceId, Boolean monitorDisabled, int numPartitions, + String stateModelDefRef, String stateModelFactoryName, String numReplica, + int minActiveReplica, int maxPartitionsPerInstance, String instanceGroupTag, + Boolean helixEnabled, String resourceGroupName, String resourceType, + Boolean groupRoutingEnabled, Boolean externalViewDisabled, + RebalanceConfig rebalanceConfig, StateTransitionTimeoutConfig stateTransitionTimeoutConfig, + Map> listFields, Map> mapFields, + Boolean p2pMessageEnabled, Map> partitionCapacityMap) { super(resourceId); if (monitorDisabled != null) { @@ -182,6 +195,15 @@ public ResourceConfig(String resourceId, Boolean monitorDisabled, int numPartiti if (mapFields != null) { _record.setMapFields(mapFields); } + + if (partitionCapacityMap != null) { + try { + setPartitionCapacityMap(partitionCapacityMap); + } catch (IOException e) { + throw new IllegalArgumentException( + "Failed to set partition capacity. Invalid capacity configuration."); + } + } } @@ -813,19 +835,11 @@ public ResourceConfig build() { // TODO: Reenable the validation in the future when ResourceConfig is ready. // validate(); - ResourceConfig config = new ResourceConfig(_resourceId, _monitorDisabled, _numPartitions, _stateModelDefRef, + return new ResourceConfig(_resourceId, _monitorDisabled, _numPartitions, _stateModelDefRef, _stateModelFactoryName, _numReplica, _minActiveReplica, _maxPartitionsPerInstance, _instanceGroupTag, _helixEnabled, _resourceGroupName, _resourceType, _groupRoutingEnabled, _externalViewDisabled, _rebalanceConfig, _stateTransitionTimeoutConfig, _preferenceLists, - _mapFields, _p2pMessageEnabled); - if (_partitionCapacityMap != null) { - try { - config.setPartitionCapacityMap(_partitionCapacityMap); - } catch (IOException e) { - throw new HelixException("Failed to generate the capacity configuration.", e); - } - } - return config; + _mapFields, _p2pMessageEnabled, _partitionCapacityMap); } } }