Skip to content

KAFKA-15265: Dynamic broker configs for remote fetch/copy quotas#16078

Merged
satishd merged 2 commits intoapache:trunkfrom
abhijeetk88:KAFKA-15434_dynamic_config
Jun 12, 2024
Merged

KAFKA-15265: Dynamic broker configs for remote fetch/copy quotas#16078
satishd merged 2 commits intoapache:trunkfrom
abhijeetk88:KAFKA-15434_dynamic_config

Conversation

@abhijeetk88
Copy link
Copy Markdown
Contributor

@abhijeetk88 abhijeetk88 commented May 24, 2024

In this PR, I have added dynamic config support for remote fetch/copy quotas. Reference KIP-956

Added unit-tests for the new dynamic broker configs

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@abhijeetk88 abhijeetk88 marked this pull request as draft May 24, 2024 19:47
@abhijeetk88 abhijeetk88 changed the title [WIP] KAFKA-15265: Dynamic broker configs for remote fetch/copy quotas KAFKA-15265: Dynamic broker configs for remote fetch/copy quotas May 25, 2024
@abhijeetk88 abhijeetk88 marked this pull request as ready for review May 25, 2024 08:17
@satishd satishd requested review from hudeqi, junrao, kamalcph, satishd and showuon and removed request for hudeqi May 27, 2024 04:46
@satishd satishd added the tiered-storage Related to the Tiered Storage feature label May 27, 2024
Copy link
Copy Markdown
Contributor

@kamalcph kamalcph left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM (reviewed the 4th commit in the PR)

@showuon
Copy link
Copy Markdown
Member

showuon commented May 31, 2024

@abhijeetk88 , Please help resolve the merge conflict. Thanks.

@abhijeetk88 abhijeetk88 force-pushed the KAFKA-15434_dynamic_config branch from 0e34986 to f226aae Compare May 31, 2024 12:19
@abhijeetk88
Copy link
Copy Markdown
Contributor Author

@showuon I have updated the PR after resolving conflicts. Please take a look.

Copy link
Copy Markdown
Member

@satishd satishd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @abhijeetk88 for the PR, left a few comments.

newConfig.values.forEach { (k, v) =>
if (reconfigurableConfigs.contains(k)) {
if (k.equals(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP)) {
if (k.equals(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP) ||
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that it is the existing logic for the existing config key RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROPfor the respective Long values.

Although all the valid keys are already checked by the earlier line if (reconfigurableConfigs.contains(k)), they are getting repeated here to check and cast the value as Long. Can we avoid double checking by removing the earlier check reconfigurableConfigs.contains(k)?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wasn't sure, why there was double-checking initially, so did not want to change it and just added the new configs check. But I think it should be ok to remove double-checks. Will fix this.

remoteLogManager.get.resizeCacheSize(newValue)
info(s"Dynamic remote log manager config: ${RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP} updated, " +
s"old value: $oldValue, new value: $newValue")
} else if (isChangedLongValue(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP)) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method should take care of updating any of the respective configs. That means, it should individually check for each config by replacing else if with if.

It is also good to add a test that updates multiple configs and check all of them are getting updated.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for catching this. Fixed this and also added a test.

name match {
case RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP =>
config.getLong(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP)
case RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP =>
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This match case can be simplified like the below code:

    name match {
      case RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP |
           RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP |
           RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP =>
        config.getLong(name)
      case n => throw new IllegalStateException(s"Unexpected dynamic remote log manager config $n")

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense.

@abhijeetk88
Copy link
Copy Markdown
Contributor Author

@satishd I have addressed your comments. Please take a look.

val errorMsg = s"Dynamic remote log manager config update validation failed for $k=$v"
throw new ConfigException(s"$errorMsg, value should be at least 1")
}
if (k.equals(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP) ||
Copy link
Copy Markdown
Contributor

@kamalcph kamalcph Jun 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we do an inverse check since we removed the contains check? What if the k is null?

if (RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP.equals(k)) || ...

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not completely sure if k can be null, but I changed it since it provides safety.

Copy link
Copy Markdown
Contributor

@kamalcph kamalcph left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please rebase the branch with trunk

// Update default config
props.put(copyQuotaProp, "100")
config.dynamicConfig.updateDefaultConfig(props)
assertEquals(100, config.getLong(copyQuotaProp))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we move the getter method from RemoteLogManagerConfig to KafkaConfig class?

  1. remoteLogManagerCopyMaxBytesPerSecond
  2. remoteLogManagerFetchMaxBytesPerSecond and
  3. remoteLogIndexFileCacheTotalSizeBytes

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we take this as part of a follow-up PR? I want to avoid unrelated changes here.

}

@Test
def testRemoteLogManagerFetchQuotaUpdates(): Unit = {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this test is similar to testRemoteLogManagerCopyQuotaUpdates. can we refactor the test to extract the method? Or, use parameterized test.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed it.

@abhijeetk88 abhijeetk88 force-pushed the KAFKA-15434_dynamic_config branch from 5173c4f to f72c000 Compare June 12, 2024 05:26
@abhijeetk88 abhijeetk88 force-pushed the KAFKA-15434_dynamic_config branch from f72c000 to 189e859 Compare June 12, 2024 05:42
Copy link
Copy Markdown
Contributor

@kamalcph kamalcph left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks for the patch!

Copy link
Copy Markdown
Member

@satishd satishd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @abhijeetk88 for addressing the review comments. LGTM.

@satishd
Copy link
Copy Markdown
Member

satishd commented Jun 12, 2024

There were a few unrelated test failures, merging it to trunk and 3.8 branches.

@satishd satishd merged commit b5fb654 into apache:trunk Jun 12, 2024
satishd pushed a commit that referenced this pull request Jun 12, 2024
)

Reviewers: Kamal Chandraprakash<kamal.chandraprakash@gmail.com>, Satish Duggana <satishd@apache.org>
apourchet added a commit to apourchet/kafka that referenced this pull request Jun 12, 2024
commit 9368ef8
Author: Gantigmaa Selenge <39860586+tinaselenge@users.noreply.github.com>
Date:   Wed Jun 12 16:04:24 2024 +0100

    KAFKA-16865: Add IncludeTopicAuthorizedOperations option for DescribeTopicPartitionsRequest (apache#16136)

    Reviewers: Mickael Maison <mickael.maison@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>, Calvin Liu <caliu@confluent.io>, Andrew Schofield <andrew_schofield@live.com>, Apoorv Mittal <amittal@confluent.io>

commit 46eb081
Author: gongxuanzhang <gongxuanzhang@foxmail.com>
Date:   Wed Jun 12 22:23:39 2024 +0800

    KAFKA-10787 Apply spotless to log4j-appender, trogdor, jmh-benchmarks, examples, shell and generator (apache#16296)

    Reviewers: Chia-Ping Tsai <chia7712@gmail.com>

commit 79b9c44
Author: gongxuanzhang <gongxuanzhang@foxmail.com>
Date:   Wed Jun 12 22:19:47 2024 +0800

    KAFKA-10787 Apply spotless to connect module (apache#16299)

    Reviewers: Chia-Ping Tsai <chia7712@gmail.com>

commit b5fb654
Author: Abhijeet Kumar <abhijeet.cse.kgp@gmail.com>
Date:   Wed Jun 12 19:47:46 2024 +0530

    KAFKA-15265: Dynamic broker configs for remote fetch/copy quotas (apache#16078)

    Reviewers: Kamal Chandraprakash<kamal.chandraprakash@gmail.com>, Satish Duggana <satishd@apache.org>

commit faee6a4
Author: Dmitry Werner <grimekillah@gmail.com>
Date:   Wed Jun 12 15:44:11 2024 +0500

    MINOR: Use predetermined dir IDs in ReplicationQuotasTest

    Use predetermined directory IDs instead of Uuid.randomUuid() in ReplicationQuotasTest.

    Reviewers: Igor Soarez <soarez@apple.com>

commit 638844f
Author: David Jacot <djacot@confluent.io>
Date:   Wed Jun 12 08:29:50 2024 +0200

    KAFKA-16770; [2/2] Coalesce records into bigger batches (apache#16215)

    This patch is the continuation of apache#15964. It introduces the records coalescing to the CoordinatorRuntime. It also introduces a new configuration `group.coordinator.append.linger.ms` which allows administrators to chose the linger time or disable it with zero. The new configuration defaults to 10ms.

    Reviewers: Jeff Kim <jeff.kim@confluent.io>, Justine Olshan <jolshan@confluent.io>

commit 39ffdea
Author: Bruno Cadonna <cadonna@apache.org>
Date:   Wed Jun 12 07:51:38 2024 +0200

    KAFKA-10199: Enable state updater by default (apache#16107)

    We have already enabled the state updater by default once.
    However, we ran into issues that forced us to disable it again.
    We think that we fixed those issues. So we want to enable the
    state updater again by default.

    Reviewers: Lucas Brutschy <lbrutschy@confluent.io>, Matthias J. Sax <matthias@confluent.io>

commit 0782232
Author: Antoine Pourchet <antoine@responsive.dev>
Date:   Tue Jun 11 22:31:43 2024 -0600

    KAFKA-15045: (KIP-924 pt. 22) Add RackAwareOptimizationParams and other minor TaskAssignmentUtils changes (apache#16294)

    We now provide a way to more easily customize the rack aware
    optimizations that we provide by way of a configuration class called
    RackAwareOptimizationParams.

    We also simplified the APIs for the optimizeXYZ utility functions since
    they were mutating the inputs anyway.

    Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>

commit 226ac5e
Author: Murali Basani <muralidhar.basani@aiven.io>
Date:   Wed Jun 12 05:38:50 2024 +0200

    KAFKA-16922 Adding unit tests for NewTopic  (apache#16255)

    Reviewers: Chia-Ping Tsai <chia7712@gmail.com>

commit 23fe71d
Author: Abhijeet Kumar <abhijeet.cse.kgp@gmail.com>
Date:   Wed Jun 12 06:27:02 2024 +0530

    KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage (apache#15820)

    - Added the integration of the quota manager to throttle copy requests to the remote storage. Reference KIP-956
    - Added unit-tests for the copy throttling logic.

    Reviewers: Satish Duggana <satishd@apache.org>, Luke Chen <showuon@gmail.com>, Kamal Chandraprakash<kamal.chandraprakash@gmail.com>

commit 2fa2c72
Author: Chris Egerton <chrise@aiven.io>
Date:   Tue Jun 11 23:15:07 2024 +0200

    MINOR: Wait for embedded clusters to start before using them in Connect OffsetsApiIntegrationTest (apache#16286)

    Reviewers: Greg Harris <greg.harris@aiven.io>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

tiered-storage Related to the Tiered Storage feature

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants