Skip to content

KAFKA-18311: Internal Topic Manager (5/5)#18442

Merged
lucasbru merged 3 commits intoapache:trunkfrom
lucasbru:kip1071merge/topic_config5
Jan 21, 2025
Merged

KAFKA-18311: Internal Topic Manager (5/5)#18442
lucasbru merged 3 commits intoapache:trunkfrom
lucasbru:kip1071merge/topic_config5

Conversation

@lucasbru
Copy link
Copy Markdown
Member

@lucasbru lucasbru commented Jan 8, 2025

The internal topic manager takes a requested topology and returns a configured topology, as well as any internal topics that need to be created.

Shares with the client-side "InternalTopicManager" the name only.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@lucasbru lucasbru changed the title KAFKA-18311: Internal Topic Manager KAFKA-18311: Internal Topic Manager (5/5) Jan 8, 2025
@lucasbru lucasbru requested review from bbejeck, cadonna and mjsax January 8, 2025 12:45
@lucasbru
Copy link
Copy Markdown
Member Author

lucasbru commented Jan 8, 2025

PTAL @aliehsaeedii

@lucasbru lucasbru force-pushed the kip1071merge/topic_config5 branch from 42f344d to 3ee997b Compare January 8, 2025 12:49
@cadonna cadonna added streams core Kafka Broker KIP-1071 PRs related to KIP-1071 labels Jan 9, 2025
@lucasbru lucasbru force-pushed the kip1071merge/topic_config5 branch from 3ee997b to 3b195a0 Compare January 10, 2025 14:14
@lucasbru
Copy link
Copy Markdown
Member Author

@cadonna I moved the source topic check as you suggested.

Copy link
Copy Markdown
Member

@cadonna cadonna left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR, @lucasbru !

Here my comments.

/**
* Immutable topic metadata, representing the current state of a topic in the broker.
*
* @param id The topic id.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

Suggested change
* @param id The topic id.
* @param id The topic ID.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Comment on lines +38 to +43
public TopicMetadata(
Uuid id,
String name,
int numPartitions,
Map<Integer, Set<String>> partitionRacks
) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The following is the code style we usually use in Streams and that is more readable IMO.

Suggested change
public TopicMetadata(
Uuid id,
String name,
int numPartitions,
Map<Integer, Set<String>> partitionRacks
) {
public TopicMetadata(Uuid id,
String name,
int numPartitions,
Map<Integer, Set<String>> partitionRacks) {

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Comment on lines +59 to +61
public static TopicMetadata fromRecord(
StreamsGroupPartitionMetadataValue.TopicMetadata record
) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public static TopicMetadata fromRecord(
StreamsGroupPartitionMetadataValue.TopicMetadata record
) {
public static TopicMetadata fromRecord(StreamsGroupPartitionMetadataValue.TopicMetadata record) {

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}

@Test
public void testConstructorWithNegativeNumPartitions() {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a test with partition number 0?
This seems a special case that should be documented with a unit test.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, actually not sure why the implementation allows this. I changed it to require positive, let's see if it breaks anything.

public class TopicMetadataTest {

@Test
public void testConstructorWithZeroUuid() {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A positive constructor unit test is missing.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Comment on lines +279 to +280
private static Collection<Set<String>> copartitionGroupsFromPersistedSubtopology(
final StreamsGroupTopologyValue.Subtopology subtopology) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private static Collection<Set<String>> copartitionGroupsFromPersistedSubtopology(
final StreamsGroupTopologyValue.Subtopology subtopology) {
private static Collection<Set<String>> copartitionGroupsFromPersistedSubtopology(final StreamsGroupTopologyValue.Subtopology subtopology) {

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will make the line too long, and my IDE (based on imported checkstyle config) will wrap it the same way.... I added another line break.

I didn't know this was an accepted standard in the streams code that everybody follows. Is there something I can do in my IDE to follow this? I imported the official Kafka checkstyle configuration, and it produces this kind of wrapping. It would be good if our "review nitpicking" and automated formatting tools somewhat agree.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree but I do also not know where it comes from. I would put it in one line although it exceeds the 120 characters, but it is also fine as you did it. The best would be to have an automatic formatter, but we never manged in the past to setup one. There was an attempt: #10428 but it was never merged.

Comment on lines +260 to +262
private static ConfiguredInternalTopic fromPersistedTopicInfo(
final StreamsGroupTopologyValue.TopicInfo topicInfo,
Map<String, Integer> decidedPartitionCountsForInternalTopics) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private static ConfiguredInternalTopic fromPersistedTopicInfo(
final StreamsGroupTopologyValue.TopicInfo topicInfo,
Map<String, Integer> decidedPartitionCountsForInternalTopics) {
private static ConfiguredInternalTopic fromPersistedTopicInfo(final StreamsGroupTopologyValue.TopicInfo topicInfo,
final Map<String, Integer> decidedPartitionCountsForInternalTopics) {

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Comment on lines +244 to +247
private static ConfiguredSubtopology fromPersistedSubtopology(
final StreamsGroupTopologyValue.Subtopology subtopology,
Map<String, Integer> decidedPartitionCountsForInternalTopics
) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private static ConfiguredSubtopology fromPersistedSubtopology(
final StreamsGroupTopologyValue.Subtopology subtopology,
Map<String, Integer> decidedPartitionCountsForInternalTopics
) {
private static ConfiguredSubtopology fromPersistedSubtopology(final StreamsGroupTopologyValue.Subtopology subtopology,
final Map<String, Integer> decidedPartitionCountsForInternalTopics
) {

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Comment on lines +151 to +172
final Set<String> fixedRepartitionTopics =
topology.subtopologies().values().stream().flatMap(x ->
x.repartitionSourceTopics().stream().filter(y -> y.partitions() != 0)
).map(StreamsGroupTopologyValue.TopicInfo::name).collect(Collectors.toSet());
final Set<String> flexibleRepartitionTopics =
topology.subtopologies().values().stream().flatMap(x ->
x.repartitionSourceTopics().stream().filter(y -> y.partitions() == 0)
).map(StreamsGroupTopologyValue.TopicInfo::name).collect(Collectors.toSet());

if (fixedRepartitionTopics.isEmpty() && flexibleRepartitionTopics.isEmpty()) {
log.info("Skipping the repartition topic validation since there are no repartition topics.");
} else {
// ensure the co-partitioning topics within the group have the same number of partitions,
// and enforce the number of partitions for those repartition topics to be the same if they
// are co-partitioned as well.
for (Collection<Set<String>> copartitionGroups : copartitionGroupsBySubtopology.values()) {
for (Set<String> copartitionGroup : copartitionGroups) {
decidedPartitionCountsForInternalTopics.putAll(
copartitionedTopicsEnforcer.enforce(copartitionGroup, fixedRepartitionTopics, flexibleRepartitionTopics));
}
}
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe export this block into a private method, so that we have each of repartition topics setup, co-partition enforcer, and changelog topcis setup in one line. Just a proposal.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Copy Markdown
Member Author

@lucasbru lucasbru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review, @cadonna! Ready for re-review.

Comment on lines +38 to +43
public TopicMetadata(
Uuid id,
String name,
int numPartitions,
Map<Integer, Set<String>> partitionRacks
) {
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

/**
* Immutable topic metadata, representing the current state of a topic in the broker.
*
* @param id The topic id.
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Comment on lines +59 to +61
public static TopicMetadata fromRecord(
StreamsGroupPartitionMetadataValue.TopicMetadata record
) {
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Comment on lines +151 to +172
final Set<String> fixedRepartitionTopics =
topology.subtopologies().values().stream().flatMap(x ->
x.repartitionSourceTopics().stream().filter(y -> y.partitions() != 0)
).map(StreamsGroupTopologyValue.TopicInfo::name).collect(Collectors.toSet());
final Set<String> flexibleRepartitionTopics =
topology.subtopologies().values().stream().flatMap(x ->
x.repartitionSourceTopics().stream().filter(y -> y.partitions() == 0)
).map(StreamsGroupTopologyValue.TopicInfo::name).collect(Collectors.toSet());

if (fixedRepartitionTopics.isEmpty() && flexibleRepartitionTopics.isEmpty()) {
log.info("Skipping the repartition topic validation since there are no repartition topics.");
} else {
// ensure the co-partitioning topics within the group have the same number of partitions,
// and enforce the number of partitions for those repartition topics to be the same if they
// are co-partitioned as well.
for (Collection<Set<String>> copartitionGroups : copartitionGroupsBySubtopology.values()) {
for (Set<String> copartitionGroup : copartitionGroups) {
decidedPartitionCountsForInternalTopics.putAll(
copartitionedTopicsEnforcer.enforce(copartitionGroup, fixedRepartitionTopics, flexibleRepartitionTopics));
}
}
}
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Comment on lines +244 to +247
private static ConfiguredSubtopology fromPersistedSubtopology(
final StreamsGroupTopologyValue.Subtopology subtopology,
Map<String, Integer> decidedPartitionCountsForInternalTopics
) {
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Comment on lines +260 to +262
private static ConfiguredInternalTopic fromPersistedTopicInfo(
final StreamsGroupTopologyValue.TopicInfo topicInfo,
Map<String, Integer> decidedPartitionCountsForInternalTopics) {
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Comment on lines +279 to +280
private static Collection<Set<String>> copartitionGroupsFromPersistedSubtopology(
final StreamsGroupTopologyValue.Subtopology subtopology) {
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will make the line too long, and my IDE (based on imported checkstyle config) will wrap it the same way.... I added another line break.

I didn't know this was an accepted standard in the streams code that everybody follows. Is there something I can do in my IDE to follow this? I imported the official Kafka checkstyle configuration, and it produces this kind of wrapping. It would be good if our "review nitpicking" and automated formatting tools somewhat agree.

}

@Test
public void testConstructorWithNegativeNumPartitions() {
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, actually not sure why the implementation allows this. I changed it to require positive, let's see if it breaks anything.

public class TopicMetadataTest {

@Test
public void testConstructorWithZeroUuid() {
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Copy Markdown
Member

@cadonna cadonna left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the updates!

I had just one very minor comment. I leave it to you whether you want to change that or not.

Comment on lines +151 to +152
enforceCopartitioning(topology, copartitionGroupsBySubtopology, log,
decidedPartitionCountsForInternalTopics, copartitionedTopicsEnforcer);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

Suggested change
enforceCopartitioning(topology, copartitionGroupsBySubtopology, log,
decidedPartitionCountsForInternalTopics, copartitionedTopicsEnforcer);
enforceCopartitioning(
topology,
copartitionGroupsBySubtopology,
log,
decidedPartitionCountsForInternalTopics,
copartitionedTopicsEnforcer
);

documented here: https://kafka.apache.org/coding-guide
There are always exceptions like log-messages.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Comment on lines +279 to +280
private static Collection<Set<String>> copartitionGroupsFromPersistedSubtopology(
final StreamsGroupTopologyValue.Subtopology subtopology) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree but I do also not know where it comes from. I would put it in one line although it exceeds the 120 characters, but it is also fine as you did it. The best would be to have an automatic formatter, but we never manged in the past to setup one. There was an attempt: #10428 but it was never merged.

@lucasbru lucasbru merged commit 7d39ba1 into apache:trunk Jan 21, 2025
pranavt84 pushed a commit to pranavt84/kafka that referenced this pull request Jan 27, 2025
The internal topic manager takes a requested topology and returns a configured topology, as well as any internal topics that need to be created.

Shares with the client-side "InternalTopicManager" the name only.

Reviewers: Bruno Cadonna <cadonna@apache.org>
airlock-confluentinc bot pushed a commit to confluentinc/kafka that referenced this pull request Jan 27, 2025
The internal topic manager takes a requested topology and returns a configured topology, as well as any internal topics that need to be created.

Shares with the client-side "InternalTopicManager" the name only.

Reviewers: Bruno Cadonna <cadonna@apache.org>
manoj-mathivanan pushed a commit to manoj-mathivanan/kafka that referenced this pull request Feb 19, 2025
The internal topic manager takes a requested topology and returns a configured topology, as well as any internal topics that need to be created.

Shares with the client-side "InternalTopicManager" the name only.

Reviewers: Bruno Cadonna <cadonna@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

core Kafka Broker KIP-1071 PRs related to KIP-1071 streams

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants