KAFKA-16909 Refactor GroupCoordinatorConfig with AbstractConfig#16458
KAFKA-16909 Refactor GroupCoordinatorConfig with AbstractConfig#16458chia7712 merged 10 commits intoapache:trunkfrom
Conversation
chia7712
left a comment
There was a problem hiding this comment.
@brandboat thanks for this patch
| config.consumerGroupMigrationPolicy, | ||
| config.offsetsTopicCompressionType | ||
| ) | ||
| val groupCoordinatorConfig = new GroupCoordinatorConfig(config) |
There was a problem hiding this comment.
Could we add a method to KafkaConfig to return GroupCoordinatorConfig? With that change, we can remove all GroupCoordinatorConfig-related getters from KafkaConfig
There was a problem hiding this comment.
Thank you for the suggestion! I have added the method to KafkaConfig to return GroupCoordinatorConfig and removed all related getters. This change is included in the latest commits.
chia7712
left a comment
There was a problem hiding this comment.
@brandboat thanks for this patch
| OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder() | ||
| .withGroupMetadataManager(groupMetadataManager) | ||
| .withOffsetsRetentionMs(1000) | ||
| .withOffsetsRetentionMinutes(1) |
There was a problem hiding this comment.
not sure why we need this change. Also, the value is changed from 1 second to 1 minute?
There was a problem hiding this comment.
Actually the original time unit of config offsets.retention.minutes is in minutes, and before this refactor we pass offsets.retention in millis to GroupCoordinatorConfig constructor and use it in OffsetMetadataManagerTest, now we pass AbstractConfig (i.e. KafkaConfig) to GroupCoordinatorConfig, which means we should pass offsets.retention in minutes instead of ms
| configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG, ConsumerGroupMigrationPolicy.DISABLED.name()); | ||
| configs.put(GroupCoordinatorConfig.OFFSETS_TOPIC_COMPRESSION_CODEC_CONFIG, (int) CompressionType.NONE.id); | ||
|
|
||
| return new GroupCoordinatorConfig(new GroupCoordinatorTestConfig(configs)); |
There was a problem hiding this comment.
return new GroupCoordinatorConfig(new AbstractConfig(Utils.mergeConfigs(Arrays.asList(
GroupCoordinatorConfig.GROUP_COORDINATOR_CONFIG_DEF,
GroupCoordinatorConfig.NEW_GROUP_CONFIG_DEF,
GroupCoordinatorConfig.OFFSET_MANAGEMENT_CONFIG_DEF,
GroupCoordinatorConfig.CONSUMER_GROUP_CONFIG_DEF)),
configs, false));WDYT? we don't need the temporary class
There was a problem hiding this comment.
Will do, thanks for the suggestion.
| val groupCoordinatorAppendLingerMs = getInt(GroupCoordinatorConfig.GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG) | ||
|
|
||
| /** Consumer group configs */ | ||
| val consumerGroupSessionTimeoutMs = getInt(GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG) |
There was a problem hiding this comment.
@OmniaGM Could you please take a look? Does it follow your idea that we should move getters out of KafkaConfig?
There was a problem hiding this comment.
Sorry for late response here, this looks great! I think the validators could also be moved out as well
There was a problem hiding this comment.
Sure, we will file a MINOR for it
dajac
left a comment
There was a problem hiding this comment.
@brandboat @chia7712 Thanks for the PR. Overall, I agree with the change. However, I have left a few suggestions for consideration. Please let me know what you think.
| public ConsumerGroupMigrationPolicy consumerGroupMigrationPolicy() { | ||
| return ConsumerGroupMigrationPolicy.parse( | ||
| config.getString(GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG)); | ||
| } |
There was a problem hiding this comment.
I would like to point out that this change is a bit risky in my opinion because it does not ensure during the startup that the migration policy is really correct. If it fails somehow, it will fail later on when consumerGroupMigrationPolicy is accessed for the first time. I wonder if we should keep local attributes and initialize them in the constructor.
There was a problem hiding this comment.
The validation is addressed by the GroupCoordinatorConfig's config definition.
Hence, the string value is valid in constructing GroupCoordinatorConfig
There was a problem hiding this comment.
Indeed. I was thinking about the case where the validation has a bug or is not good enough.
| public int offsetCommitTimeoutMs() { | ||
| return config.getInt(GroupCoordinatorConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG); | ||
| } |
There was a problem hiding this comment.
This is an example of the config which is accessed extremely frequently (there are other like this too). I think that having attributes would be better as it avoid having to look it up in the config every time.
There was a problem hiding this comment.
This is a good question!
The approach of this PR is to make sure GroupCoordinatorConfig can see the latest configs to avoid potential bugs (#16394). However, it has side effect which brings extra cost: "volatile" and "lookup/parse".
- The cost of "volatile" (or other similar sync trick) is required if we make
GroupCoordinatorConfigsee latest configs. - The cost of "lookup/parse" could be eliminated if we do a bit refactor for it. For example, we pass
Supplier<GroupCoordinatorConfig>instead ofGroupCoordinatorConfigto the callers. By that changes, we can makeGroupCoordinatorConfighave all immutable pre-created local attributes. The impl ofSupplier<GroupCoordinatorConfig>will be generated byKafkaConfigand it looks likeAtomicReference::get. However, the side effect is that the usage will get a little ugly: "config.numThreads()" -> "config.get().numThreads()"
WDYT? BTW, we had a related discussion in https://issues.apache.org/jira/browse/KAFKA-17001
There was a problem hiding this comment.
None of the configs in this class are dynamic so I am not sure that it is worth it. We could perhaps mention it in the javadoc of the class. If we introduce a dynamic config, we should indeed not use an attribute for it.
For the context, in KafkaConfig, we always had the distinction between val (static values) and def (dynamic ones). We could do the same here, I suppose.
|
I double-check all configs and yes all of them are not dynamic. Maybe we don't need to be over-engineering for now. Hence, we can have a PR for following changes.
@brandboat @dajac @OmniaGM PTAL, I hope this can be a guideline for all similar config class |
|
@brandboat I open https://issues.apache.org/jira/browse/KAFKA-17081 as follow-up. And it is assigned to you. PLEASE feel free to assign it back to me if you have no bandwidth |
|
@chia7712 Thanks. Sounds good to me! |
…he#16458) Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
related to https://issues.apache.org/jira/browse/KAFKA-16909
as title, this pr follows RemoteLogManagerConfig.java, pass AbstractConfig to constructor.
Committer Checklist (excluded from commit message)