Skip to content
Prev Previous commit
Next Next commit
address @chia7712's feedback
  • Loading branch information
OmniaGM committed Aug 1, 2024
commit 5a4d3d5d0dfbe54a0a71a22a7913370447077bf7
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,13 @@ public final class TransactionLogConfig {
// Configuration for testing only as default value should be sufficient for typical usage
.defineInternal(PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_CONFIG, INT, PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT, atLeast(1), LOW, PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DOC);

private AbstractConfig config;
private int transactionTopicMinISR;
private int transactionsLoadBufferSize;
private short transactionTopicReplicationFactor;
private int transactionTopicPartitions;
private int transactionTopicSegmentBytes;
private int producerIdExpirationCheckIntervalMs;
private final AbstractConfig config;
private final int transactionTopicMinISR;
private final int transactionsLoadBufferSize;
private final short transactionTopicReplicationFactor;
private final int transactionTopicPartitions;
private final int transactionTopicSegmentBytes;
private final int producerIdExpirationCheckIntervalMs;

public TransactionLogConfig(AbstractConfig config) {
this.config = config;
Expand Down Expand Up @@ -112,17 +112,17 @@ public int transactionTopicSegmentBytes() {
return transactionTopicSegmentBytes;
}

public int producerIdExpirationCheckIntervalMs() {
return producerIdExpirationCheckIntervalMs;
}

// This is a broker dynamic config used for DynamicProducerStateManagerConfig
public Boolean transactionPartitionVerificationEnable() {
return config.getBoolean(TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you please add comments to for those dynamic configs?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even with the comments, I'm not sure I understand why this is not initialized in the constructor.

Can you explain how this is supposed to work? AbstractConfig instances are kind of read only by default but it looks like we're expecting it to change here. Am I missing something?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we had a discussion for that before (#16458 (comment))

AbstractConfig is read-only, but the sub class KafkaConfig can update inner configs dynamically (see https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/KafkaConfig.scala#L192). Hence, the passed AbstractConfig always have "up-to-date configs"

There is a jira which tries to refactor it (https://issues.apache.org/jira/browse/KAFKA-17001), but I haven't worked on it because that will introduce major changes to scala code.

In short, the basic rules for now are shown below.

  1. the non-dynamic config will be evaluated in constructor.
  2. the dynamic config will be evaluated in getter (as AbstractConfig always have latest configs)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanations!

}

// This is a broker dynamic config used for DynamicProducerStateManagerConfig
public int producerIdExpirationMs() {
return config.getInt(PRODUCER_ID_EXPIRATION_MS_CONFIG);
}

public int producerIdExpirationCheckIntervalMs() {
return producerIdExpirationCheckIntervalMs;
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,10 @@ public final class TransactionStateManagerConfig {
.define(TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_CONFIG, INT, TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_DEFAULT, atLeast(1), LOW, TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTIONS_INTERVAL_MS_DOC)
.define(TRANSACTIONS_REMOVE_EXPIRED_TRANSACTIONAL_ID_CLEANUP_INTERVAL_MS_CONFIG, INT, TRANSACTIONS_REMOVE_EXPIRED_TRANSACTIONAL_ID_CLEANUP_INTERVAL_MS_DEFAULT, atLeast(1), LOW, TRANSACTIONS_REMOVE_EXPIRED_TRANSACTIONS_INTERVAL_MS_DOC);

private int transactionalIdExpirationMs;
private int transactionMaxTimeoutMs;
private int transactionAbortTimedOutTransactionCleanupIntervalMs;
private int transactionRemoveExpiredTransactionalIdCleanupIntervalMs;
private final int transactionalIdExpirationMs;
private final int transactionMaxTimeoutMs;
private final int transactionAbortTimedOutTransactionCleanupIntervalMs;
private final int transactionRemoveExpiredTransactionalIdCleanupIntervalMs;

public TransactionStateManagerConfig(AbstractConfig config) {
transactionalIdExpirationMs = config.getInt(TransactionStateManagerConfig.TRANSACTIONAL_ID_EXPIRATION_MS_CONFIG);
Expand All @@ -81,7 +81,4 @@ public int transactionAbortTimedOutTransactionCleanupIntervalMs() {
public int transactionRemoveExpiredTransactionalIdCleanupIntervalMs() {
return transactionRemoveExpiredTransactionalIdCleanupIntervalMs;
}



}