KAFKA-7632: Support Compression Levels (KIP-390)#15516
KAFKA-7632: Support Compression Levels (KIP-390)#15516mimaison merged 6 commits intoapache:trunkfrom
Conversation
df9ca6e to
2f54aac
Compare
d284cd3 to
71d84bd
Compare
|
@divijvaidya It seems you've done a bit of work around compression in the past. Can you take a look? Thanks |
showuon
left a comment
There was a problem hiding this comment.
Thanks for the PR. Left some comments.
|
|
||
| // No in place assignment situation 1 | ||
| boolean inPlaceAssignment = sourceCompression == targetCompression; | ||
| boolean inPlaceAssignment = sourceCompressionType == targetCompression.type(); |
There was a problem hiding this comment.
So we won't do re-compression if only level is different? I didn't see this in KIP. Maybe we should add it?
There was a problem hiding this comment.
The broker has no easy way of retrieving the level that the producer used when compressing the records. So if the compression codec matches, I decided to keep the compressed bytes instead of decompressing and compressing everything again as this would be wasteful, especially as the producer could have already used the same compression level.
There was a problem hiding this comment.
I agree. But I think we should mention this in the KIP-390 at least.
There was a problem hiding this comment.
Right, I updated the KIP.
| public static final String COMPRESSION_GZIP_LEVEL_DOC = "The compression level to use if " + COMPRESSION_TYPE_CONFIG + " is set to <code>gzip</code>."; | ||
| public static final String COMPRESSION_LZ4_LEVEL_CONFIG = "compression.lz4.level"; | ||
| public static final String COMPRESSION_LZ4_LEVEL_DOC = "The compression level to use if " + COMPRESSION_TYPE_CONFIG + " is set to <code>lz4</code>."; | ||
| public static final String COMPRESSION_ZSTD_LEVEL_CONFIG = "compression.zstd.level"; | ||
| public static final String COMPRESSION_ZSTD_LEVEL_DOC = "The compression level to use if " + COMPRESSION_TYPE_CONFIG + " is set to <code>zstd</code>."; |
There was a problem hiding this comment.
nit: Should we provide the doc link for each compression type? It's hard to know which level means what.
There was a problem hiding this comment.
Do you mean a link to the compression library websites?
There was a problem hiding this comment.
I was thinking we added in the config description. Or maybe added in KIP-390 is good enough.
|
|
||
| public static final int MIN_LEVEL = 1; | ||
| public static final int MAX_LEVEL = 17; | ||
| public static final int DEFAULT_LEVEL = 9; |
There was a problem hiding this comment.
So, every time we update the Lz4 library, we may need to update the above values? We probably want to add a note here.
There was a problem hiding this comment.
I hesitated defining these constants for this reason but these levels have not changed over 10 years [0], so hopefully this won't require a lot of maintenance.
0: https://github.com/lz4/lz4-java/blame/master/src/java/net/jpountz/lz4/LZ4Constants.java#L23-L24
|
I also added a couple of new tests in LogValidatorTest to check recompression only happens if the compression codec is different between the records from the producer and the topic configuration and does not happen if only the compression levels are different. |
showuon
left a comment
There was a problem hiding this comment.
Had another look. LGTM! Just some comments to update the KIP. Thanks.
Co-authored-by: Lee Dongjin <dongjin@apache.org>
|
Thanks for the reviews! |
|
None of the test failures seem related, merging to trunk |
Reviewers: Jun Rao <jun@confluent.io>, Luke Chen <showuon@gmail.com> Co-authored-by: Lee Dongjin <dongjin@apache.org>
Reviewers: Jun Rao <jun@confluent.io>, Luke Chen <showuon@gmail.com> Co-authored-by: Lee Dongjin <dongjin@apache.org>
Reviewers: Jun Rao <jun@confluent.io>, Luke Chen <showuon@gmail.com> Co-authored-by: Lee Dongjin <dongjin@apache.org>
Reviewers: Jun Rao <jun@confluent.io>, Luke Chen <showuon@gmail.com> Co-authored-by: Lee Dongjin <dongjin@apache.org>
|
Has anybody noticed that the Linear Write test in KIP-390 is inaccurate?
|
|
Yeah it looks like the numbers are not accurate. To be honest it's a bit of a strange performance test. The Produce Test benchmark should be much more representative. However I did not reproduce that benchmark. It might be worth asking @dongjinleekr if he has any data points to share. |
| ByteBufferOutputStream bufferOutputStream, | ||
| final long deleteHorizonMs) { | ||
| byte magic = originalBatch.magic(); | ||
| Compression compression = Compression.of(originalBatch.compressionType()).build(); |
There was a problem hiding this comment.
-
I'm not sure whether there has been any discussion about the compression level used during compaction. The current implementation just applies the default level, but perhaps it should respect the topic's configured compression level
-
Another brainstorm is to introduce a flag that allows compaction to use different compression. This would give users option to choose a different compression algorithm for older data.
There was a problem hiding this comment.
-
Right, here we could use the level if specified. I expect most topics to use
compression.type=producerbut in case a specific compression type and level is set, that would make sense to use them. -
It's not something I thought about before. Do you think there are scenarios where the gains of picking a different level for older data would be significant enough to motivate such a feature?
There was a problem hiding this comment.
That’s an interesting topic.
For 1., it would be intuitive and reasonable to use the compression level that is set. I’ll create a minor for it.
For 2., I’m not entirely sure, but one possible case is that users may prefer tighter compression for cold data to save space, especially if storage is cost-sensitive
There was a problem hiding this comment.
Right, here we could use the level if specified. I expect most topics to use compression.type=producer but in case a specific compression type and level is set, that would make sense to use them.
@Yunyung Could you please file a minor patch for it?
Do you think there are scenarios where the gains of picking a different level for older data would be significant enough to motivate such a feature?
The key point is the compression type rather than level. I received a request to compress old data during compaction. The change should be straightforward, so it seems acceptable.
There was a problem hiding this comment.
@chia7712 : Previously, we considered using the topic level compression type instead of the one in the original batch. There is one subtle issue on batch size. During compaction, we group a set of segments so that the total size doesn't exceed 2GB. If we use a different compression type, the compacted data could be exceeding the max segment limit and failing the index append.
There was a problem hiding this comment.
If we use a different compression type, the compacted data could be
exceeding the max segment limit and failing the index append.
It appears that using a high compression level during ingestion can indeed trigger overflow issue during compaction. This is because the cleaner's size estimation becomes inaccurate when it rebuilds batches using the default compression level instead of the origin one
[2025-12-28 08:25:42,915] WARN [kafka-log-cleaner-thread-0]: Unexpected exception thrown when cleaning log Log(dir=/tmp/log-folder-0/chia-0, topicId=5vKt56jQQ_S3QRuqrNmrTw, topic=chia, partition=0, highWatermark=66028296, lastStableOffset=66028296, logStartOffset=0, logEndOffset=66028296). Marking its partition (chia-0) as uncleanable (org.apache.kafka.storage.internals.log.LogCleaner$CleanerThread)
org.apache.kafka.storage.internals.log.LogCleaningException: Append of size 258080 bytes is too large for segment with current file position at 2147262463
at org.apache.kafka.storage.internals.log.LogCleaner$CleanerThread.cleanFilthiestLog(LogCleaner.java:570)
at org.apache.kafka.storage.internals.log.LogCleaner$CleanerThread.tryCleanFilthiestLog(LogCleaner.java:544)
at org.apache.kafka.storage.internals.log.LogCleaner$CleanerThread.doWork(LogCleaner.java:513)
at org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:136)
Caused by: java.lang.IllegalArgumentException: Append of size 258080 bytes is too large for segment with current file position at 2147262463
at org.apache.kafka.common.record.FileRecords.append(FileRecords.java:196)
at org.apache.kafka.storage.internals.log.LogSegment.append(LogSegment.java:260)
at org.apache.kafka.storage.internals.log.Cleaner.cleanInto(Cleaner.java:405)
at org.apache.kafka.storage.internals.log.Cleaner.cleanSegments(Cleaner.java:243)
at org.apache.kafka.storage.internals.log.Cleaner.doClean(Cleaner.java:180)
at org.apache.kafka.storage.internals.log.Cleaner.clean(Cleaner.java:127)
at org.apache.kafka.storage.internals.log.LogCleaner$CleanerThread.cleanLog(LogCleaner.java:596)
at org.apache.kafka.storage.internals.log.LogCleaner$CleanerThread.cleanFilthiestLog(LogCleaner.java:565)
... 3 more
To mitigate this, I propose introducing a safety margin specifically for partitions that fail to compact due to overflows.
There was a problem hiding this comment.
Another option is to make the cleaning logic more generic so that it could produce more than 1 segment during each round of cleaning if the segment size limit is exceeded.
There was a problem hiding this comment.
Theoretically, having the cleaner produce multiple segments is the superior design. However, the current implementation strictly pre-groups the segments. Simply splitting an overflowing group would result in fragmented, undersized segments. To handle this gracefully would require major architectural surgery on the grouping logic.
Given that this overflow is a rare edge case, requiring a specific combination of large segments, significant compression drop, and low deletion rates, the adaptive safety margin is a much more pragmatic win.
@junrao wdyt ?
There was a problem hiding this comment.
There was a problem hiding this comment.
Good find!
Thanks for following up

Based on #10826 with updates to match the recent amends we made to KIP-390.
Committer Checklist (excluded from commit message)