Skip to content

[SPARK-49064][BUILD] Upgrade Kafka to 3.8.0#47540

Closed
panbingkun wants to merge 4 commits intoapache:masterfrom
panbingkun:SPARK-49064
Closed

[SPARK-49064][BUILD] Upgrade Kafka to 3.8.0#47540
panbingkun wants to merge 4 commits intoapache:masterfrom
panbingkun:SPARK-49064

Conversation

@panbingkun
Copy link
Copy Markdown
Contributor

@panbingkun panbingkun commented Jul 31, 2024

What changes were proposed in this pull request?

The pr aims to upgrade kafka from 3.7.1 to 3.8.0.

Why are the changes needed?

https://downloads.apache.org/kafka/3.8.0/RELEASE_NOTES.html

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Pass GA.

Was this patch authored or co-authored using generative AI tooling?

No.

@github-actions github-actions bot added the BUILD label Jul 31, 2024
@panbingkun panbingkun changed the title [WIP][SPARK-49064][BUILD] Upgrade Kafka to 3.8.0 [SPARK-49064][BUILD] Upgrade Kafka to 3.8.0 Jul 31, 2024
@dongjoon-hyun
Copy link
Copy Markdown
Member

Since the title is changed, could you make it Ready for review?

private val cleanupLogsPrivateMethod = PrivateMethod[LogManager](Symbol("cleanupLogs"))
def cleanupLogs(): Unit = {
server.logManager.cleanupLogs()
server.logManager.invokePrivate(cleanupLogsPrivateMethod())
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that Kafka community changed this without the official KAFKA issue.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, that's right, it was this PR that caused this change.

messages.foreach { case (k, v) =>
val record = new SimpleRecord(k.getBytes, v.getBytes)
log.appendAsLeader(MemoryRecords.withRecords(CompressionType.NONE, record), 0);
log.appendAsLeader(MemoryRecords.withRecords(Compression.NONE, Seq(record): _*), 0);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a new feature of Kafka 3.8.0 (KAFKA-7632, KIP-390).

Copy link
Copy Markdown
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM for Apache Spark 4.0.0-preview2. Thank you for keeping tracking this area, @panbingkun .

}

private val cleanupLogsPrivateMethod = PrivateMethod[LogManager](Symbol("cleanupLogs"))
def cleanupLogs(): Unit = {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

val record = new SimpleRecord(k.getBytes, v.getBytes)
log.appendAsLeader(MemoryRecords.withRecords(CompressionType.NONE, record), 0);
log.appendAsLeader(MemoryRecords.withRecords(Compression.NONE, Seq(record): _*), 0);
}
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@panbingkun panbingkun marked this pull request as ready for review July 31, 2024 06:37
@panbingkun
Copy link
Copy Markdown
Contributor Author

Since the title is changed, could you make it Ready for review?

Done.

@panbingkun
Copy link
Copy Markdown
Contributor Author

+1, LGTM for Apache Spark 4.0.0-preview2. Thank you for keeping tracking this area, @panbingkun .

Thank you for your review! ❤️

@dongjoon-hyun
Copy link
Copy Markdown
Member

The failed OracleIntegrationSuite is irrelevant to this PR.

Merged to master for Apache Spark 4.0.0-preview2.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants