Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,13 @@
* Exception thrown by Helix due to rebalance failures.
*/
public class HelixRebalanceException extends Exception {
// TODO: Adding static description or other necessary fields into the enum instances for
// TODO: supporting the rebalance monitor to understand the exception.
public enum Type {
INVALID_CLUSTER_STATUS,
INVALID_REBALANCER_STATUS,
FAILED_TO_CALCULATE,
INVALID_INPUT,
Comment thread
jiajunwang marked this conversation as resolved.
UNKNOWN_FAILURE
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@
*/

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

import com.google.common.annotations.VisibleForTesting;
import org.I0Itec.zkclient.exception.ZkNoNodeException;
import org.I0Itec.zkclient.serialize.ZkSerializer;
import org.apache.helix.BucketDataAccessor;
import org.apache.helix.HelixException;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixProperty;
import org.apache.helix.ZNRecord;
import org.apache.helix.manager.zk.ZNRecordJacksonSerializer;
Expand All @@ -50,33 +52,42 @@ public class AssignmentMetadataStore {
private Map<String, ResourceAssignment> _globalBaseline;
private Map<String, ResourceAssignment> _bestPossibleAssignment;

AssignmentMetadataStore(String metadataStoreAddrs, String clusterName) {
this(new ZkBucketDataAccessor(metadataStoreAddrs), clusterName);
}

AssignmentMetadataStore(BucketDataAccessor bucketDataAccessor, String clusterName) {
_dataAccessor = bucketDataAccessor;
_baselinePath = String.format(BASELINE_TEMPLATE, clusterName, ASSIGNMENT_METADATA_KEY);
_bestPossiblePath = String.format(BEST_POSSIBLE_TEMPLATE, clusterName, ASSIGNMENT_METADATA_KEY);
}

AssignmentMetadataStore(HelixManager helixManager) {
this(new ZkBucketDataAccessor(helixManager.getMetadataStoreConnectionString()),
helixManager.getClusterName());
}

public Map<String, ResourceAssignment> getBaseline() {
// Return the in-memory baseline. If null, read from ZK. This is to minimize reads from ZK
if (_globalBaseline == null) {
HelixProperty baseline =
_dataAccessor.compressedBucketRead(_baselinePath, HelixProperty.class);
_globalBaseline = splitAssignments(baseline);
try {
HelixProperty baseline =
_dataAccessor.compressedBucketRead(_baselinePath, HelixProperty.class);
_globalBaseline = splitAssignments(baseline);
} catch (ZkNoNodeException ex) {
// Metadata does not exist, so return an empty map
_globalBaseline = Collections.emptyMap();
}
}
return _globalBaseline;
}

public Map<String, ResourceAssignment> getBestPossibleAssignment() {
// Return the in-memory baseline. If null, read from ZK. This is to minimize reads from ZK
if (_bestPossibleAssignment == null) {
HelixProperty baseline =
_dataAccessor.compressedBucketRead(_bestPossiblePath, HelixProperty.class);
_bestPossibleAssignment = splitAssignments(baseline);
try {
HelixProperty baseline =
_dataAccessor.compressedBucketRead(_bestPossiblePath, HelixProperty.class);
_bestPossibleAssignment = splitAssignments(baseline);
} catch (ZkNoNodeException ex) {
// Metadata does not exist, so return an empty map
_bestPossibleAssignment = Collections.emptyMap();
}
}
return _bestPossibleAssignment;
}
Expand Down Expand Up @@ -113,6 +124,16 @@ public void persistBestPossibleAssignment(
_bestPossibleAssignment = bestPossibleAssignment;
}

protected void finalize() {
// To ensure all resources are released.
close();
}

// Close to release all the resources.
public void close() {
_dataAccessor.disconnect();
}

/**
* Produces one HelixProperty that contains all assignment data.
* @param name
Expand All @@ -123,8 +144,9 @@ private HelixProperty combineAssignments(String name,
Map<String, ResourceAssignment> assignmentMap) {
HelixProperty property = new HelixProperty(name);
// Add each resource's assignment as a simple field in one ZNRecord
// Node that don't use Arrays.toString() for the record converting. The deserialize will fail.
assignmentMap.forEach((resource, assignment) -> property.getRecord().setSimpleField(resource,
Arrays.toString(SERIALIZER.serialize(assignment.getRecord()))));
new String(SERIALIZER.serialize(assignment.getRecord()))));
Comment thread
jiajunwang marked this conversation as resolved.
return property;
}

Expand All @@ -138,8 +160,8 @@ private Map<String, ResourceAssignment> splitAssignments(HelixProperty property)
// Convert each resource's assignment String into a ResourceAssignment object and put it in a
// map
property.getRecord().getSimpleFields()
.forEach((resource, assignment) -> assignmentMap.put(resource,
new ResourceAssignment((ZNRecord) SERIALIZER.deserialize(assignment.getBytes()))));
.forEach((resource, assignmentStr) -> assignmentMap.put(resource,
new ResourceAssignment((ZNRecord) SERIALIZER.deserialize(assignmentStr.getBytes()))));
return assignmentMap;
}
}
Loading