KAFKA-19909: Add rack awareness assignment to UniformHomogeneousAssignmentBuilder#20000
KAFKA-19909: Add rack awareness assignment to UniformHomogeneousAssignmentBuilder#20000FrankYang0529 wants to merge 12 commits into
Conversation
|
This PR is being marked as stale since it has not had any activity in 90 days. If you If you are having difficulty finding a reviewer, please reach out on the [mailing list](https://kafka.apache.org/contact). If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. |
…nmentBuilder Signed-off-by: PoAn Yang <payang@apache.org>
1bd5902 to
2ae9377
Compare
Signed-off-by: PoAn Yang <payang@apache.org>
|
@FrankYang0529 Thanks for the patch! Have you already tried to run the micro-benchmarks? The performance of the assignors is critical so we must take a performance driven approach. |
|
I did some tests, but the performance is not good. I will try to improve it tomorrow. PR: Uniform Homogeneous rack awareness vs non rack awarenessTrunk: Uniform Homogeneous non rack awareness |
Signed-off-by: PoAn Yang <payang@apache.org>
|
New commit improves performance a little, but overall it's still not good. I will try other ways tomorrow. PR: Uniform Homogeneous rack awareness vs non rack awareness |
Signed-off-by: PoAn Yang <payang@apache.org>
Signed-off-by: PoAn Yang <payang@apache.org>
PR: Uniform Homogeneous rack awareness vs non rack awareness |
|
Hi @dajac, following is summary for trunk & PR comparison. The difference between trunk and PR for non-rackaware is from checking whether to use rack awareness strategy (Step 1). Current PR implementation needs The performance check for step 1 takes about
|
| // If the new assignment is immutable, we must create a deep copy of it | ||
| // before altering it. | ||
| newAssignment = AssignorHelpers.deepCopyAssignment(newAssignment); | ||
| targetAssignment.put(memberId, new MemberAssignmentImpl(newAssignment)); |
There was a problem hiding this comment.
#20097 introduced a performance regression. After that change MemberAssignmentImpl always wraps the new assignment in another immutable map, so we will always deep copy here. Can we fix the regression in a separate PR?
There was a problem hiding this comment.
I filed https://issues.apache.org/jira/browse/KAFKA-19955 and opened #21058 to fix it. It might improve the rack-aware times by a bit.
There was a problem hiding this comment.
The fix has been merged
| Set<String> allPartitionRacks = new HashSet<>(); | ||
| for (Uuid topicId : this.subscribedTopicIds) { | ||
| int partitionCount = subscribedTopicDescriber.numPartitions(topicId); | ||
| for (int partitionId = 0; partitionId < partitionCount; partitionId++) { | ||
| Set<String> racks = subscribedTopicDescriber.racksForPartition(topicId, partitionId); | ||
| partitionRacks.put(new TopicIdPartition(topicId, partitionId), racks); | ||
| allPartitionRacks.addAll(racks); | ||
| } | ||
| } |
There was a problem hiding this comment.
A lot of the constructor cost is in gathering the racks of all partitions. @dajac mentioned we could try shortcutting the rack-aware part if no members have racks.
Or we could also check if no members have racks matching any broker, since clients can misconfigure their racks. The number of racks is <= the number of brokers, which is small and we can collect the set of broker racks cheaply from the MetadataImage underlying the SubscribedTopicDescriber.
There was a problem hiding this comment.
@dajac mentioned we could try shortcutting the rack-aware part if no members have racks.
Yes, good suggestion. I changed the code to skip gathering the racks of all partitions if no members have racks. However, for rack-awareness assignment, it still needs to gather information and the worst case may cost 293.988 ms for 500,000 topic-partitions to 10,000 members. Can we afford this cost in server side? Thanks.
Signed-off-by: PoAn Yang <payang@apache.org>
There was a problem hiding this comment.
Pull request overview
This PR adds rack awareness support to the UniformHomogeneousAssignmentBuilder to optimize partition assignments by minimizing cross-zone traffic. The implementation follows a priority of Balance > Rack Awareness > Stickiness.
Key Changes:
- Implements a 4-step rack-aware assignment algorithm that checks for rack overlap, revokes mismatched partitions, assigns partitions based on rack matching, and handles remaining unassigned partitions
- Adds helper methods to determine when rack-aware assignment is beneficial and to validate rack matching
- Refactors the quota distribution logic to support dynamic assignment during rack-aware processing
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/UniformHomogeneousAssignmentBuilder.java |
Core implementation of rack-aware assignment algorithm with partition revocation and rack-matching assignment logic |
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/AssignorHelpers.java |
Added utility methods isRackMatch() and useRackAwareAssignment() to support rack-aware decisions |
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/UniformHomogeneousAssignmentBuilderTest.java |
Comprehensive test coverage for rack-aware scenarios including first assignments, reassignments, and edge cases |
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/AssignorHelpersTest.java |
Unit tests for new rack-aware helper methods |
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/CommonAssignorTests.java |
Updated test utilities to handle mutable assignments when rack awareness is enabled |
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/AssignorBenchmarkUtils.java |
Enhanced benchmark metadata creation to include rack information for performance testing |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| // that went in. | ||
| assertSame(membersWithAssignment.get(memberId).partitions(), secondAssignment.members().get(memberId).partitions()); | ||
| if (rackAware) { | ||
| // With rack awareness, the assignment maps may be mutable cause of revoking non-matched partitions. |
There was a problem hiding this comment.
Grammatical error: "cause of" should be "because of".
| // With rack awareness, the assignment maps may be mutable cause of revoking non-matched partitions. | |
| // With rack awareness, the assignment maps may be mutable because of revoking non-matched partitions. |
| if (allMemberRacks.isEmpty() || Collections.disjoint(allMemberRacks, allPartitionRacks)) | ||
| return false; | ||
| else { | ||
| return !racksPerPartition.values().stream().allMatch(allPartitionRacks::equals); | ||
| } |
There was a problem hiding this comment.
[nitpick] The if-else statement could be simplified by removing the else block and directly returning the condition. Consider:
return !allMemberRacks.isEmpty()
&& !Collections.disjoint(allMemberRacks, allPartitionRacks)
&& !racksPerPartition.values().stream().allMatch(allPartitionRacks::equals);This is more concise and avoids the unnecessary else block.
| if (allMemberRacks.isEmpty() || Collections.disjoint(allMemberRacks, allPartitionRacks)) | |
| return false; | |
| else { | |
| return !racksPerPartition.values().stream().allMatch(allPartitionRacks::equals); | |
| } | |
| return !allMemberRacks.isEmpty() | |
| && !Collections.disjoint(allMemberRacks, allPartitionRacks) | |
| && !racksPerPartition.values().stream().allMatch(allPartitionRacks::equals); |
|
Performance for skipping partition racks collection if member racks is empty. PR: Uniform Homogeneous rack awareness vs non rack awareness@dajac @squah-confluent I follow the suggestion and the performance for non-rackaware cases are similar as trunk branch. However, the rackaware cases still cost around 160ms. Since we only run assignment algorithm when group epoch is larger than member epoch, I think the result don't influence too much for normal heartbeat request. WDYT? Thank you.
|
|
@FrankYang0529 Thanks for collecting the new benchmark results. The non-rack aware numbers look good. The rack aware number are better but still a little slow. It's not ideal to be blocking the group coordinator thread for 150 ms. Maybe this won't be too bad in practice, since
If we really want to, I think it's possible to improve performance further by re-designing the Separately I have some concerns about stickiness when static members are replaced. The group coordinator assigns the new static member a new member id and keeps the previous assignment, so the order of member ids is not stable. How expensive would it be to track the previous owner of partitions in |
|
This PR is being marked as stale since it has not had any activity in 90 days. If you If you are having difficulty finding a reviewer, please reach out on the [mailing list](https://kafka.apache.org/contact). If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. |
Support rack awareness in
UniformHomogeneousAssignmentBuilder. Thegoal is balance > rack awareness > stickiness.
Step 1: Check whether to use rack awareness strategy.
Using rack awareness strategy only if member racks and partition racks
overlap and not all partitions have the same set of racks. If we don't
need to use rack awareness strategy, we don't do rack check for further
steps.
Step 2: Revoke partitions.
Revoke partitions if a member doesn't have enough quota or member and
partition racks are mismatched.
Step 3: Assign partitions based on rack awareness.
We only run step 3 if step 1 is true. Go through unassigned partitions
and assign them to members which have enough quota and matched racks.
Step 4: Assign remaining partitions.
Assign remaining partitions based on unfilled quota on each members.