Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
45cfc64
Remove MemoryRecordsBuilder(ByteBuffer, byte, CompressionType, Timest…
dongjinleekr Jun 18, 2019
7333166
a builder to build MemoryRecordsBuilder with a fluent method call, al…
dongjinleekr Jun 20, 2019
a61e8ac
Remove MemoryRecords#builder(ByteBuffer, CompressionType, TimestampTy…
dongjinleekr Jun 21, 2019
929b38a
Remove MemoryRecords#builder(ByteBuffer, byte, CompressionType, Times…
dongjinleekr Jun 21, 2019
1ad88d6
Remove MemoryRecords#builder(ByteBuffer, byte, CompressionType, Times…
dongjinleekr Jun 21, 2019
e0a1fb9
Remove MemoryRecords#builder(ByteBuffer, byte, CompressionType, Times…
dongjinleekr Jun 21, 2019
83d9662
Remove MemoryRecords#builder(ByteBuffer, CompressionType, long, long,…
dongjinleekr Jun 21, 2019
7f927d5
Remove MemoryRecords#builder(ByteBuffer, byte, CompressionType, Times…
dongjinleekr Jun 21, 2019
fe694f0
Remove MemoryRecords#builder(ByteBuffer, byte, CompressionType, Times…
dongjinleekr Jun 21, 2019
cd21bb0
Remove MemoryRecords#builder(ByteBuffer, byte, CompressionType, Times…
dongjinleekr Jun 21, 2019
bd6377e
Refactor MemoryRecords#writeEndTransactionalMarker to use MemoryRecor…
dongjinleekr Jun 22, 2019
3f4b2f4
Refactor MemoryRecords#withRecords to use MemoryRecords#builder, inst…
dongjinleekr Jun 22, 2019
0cfcef0
Add BrokerCompressionTest#testGetTargetCompressionCodec: validates Br…
dongjinleekr Jun 22, 2019
a48fd96
BrokerCompressionCodec#getTargetCompressionCodec -> BrokerCompression…
dongjinleekr Jun 22, 2019
ebe2f1a
Refactor KafkaLZ4BlockOutputStream, KafkaLZ4Test + Add support to com…
dongjinleekr Jun 21, 2019
b2c90b8
Add GZipOutputStream to support compression level, block size for Gzip
dongjinleekr Jun 21, 2019
3c91af6
Add CompressionConfig + expand CompressionType#wrapForOutput
dongjinleekr Jun 22, 2019
b6c4074
Make broker to use CompressionConfig
dongjinleekr Jun 22, 2019
d116b47
Make RecordAccumulator to use CompressionConfig
dongjinleekr Jun 24, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Make RecordAccumulator to use CompressionConfig
  • Loading branch information
dongjinleekr committed Jun 24, 2019
commit d116b47271c19888f586ba70e7a7dc2519e9bd8d
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.apache.kafka.common.network.ChannelBuilder;
import org.apache.kafka.common.network.Selector;
import org.apache.kafka.common.record.AbstractRecords;
import org.apache.kafka.common.record.CompressionConfig;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.serialization.Serializer;
Expand Down Expand Up @@ -395,7 +396,7 @@ public KafkaProducer(Properties properties, Serializer<K> keySerializer, Seriali
this.apiVersions = new ApiVersions();
this.accumulator = new RecordAccumulator(logContext,
config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),
this.compressionType,
CompressionConfig.of(this.compressionType),
lingerMs(config),
retryBackoffMs,
deliveryTimeoutMs,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Meter;
import org.apache.kafka.common.record.AbstractRecords;
import org.apache.kafka.common.record.CompressionConfig;
import org.apache.kafka.common.record.CompressionRatioEstimator;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MemoryRecordsBuilder;
import org.apache.kafka.common.record.Record;
Expand All @@ -71,7 +71,7 @@ public final class RecordAccumulator {
private final AtomicInteger flushesInProgress;
private final AtomicInteger appendsInProgress;
private final int batchSize;
private final CompressionType compression;
private final CompressionConfig compressionConfig;
private final int lingerMs;
private final long retryBackoffMs;
private final int deliveryTimeoutMs;
Expand All @@ -91,7 +91,7 @@ public final class RecordAccumulator {
*
* @param logContext The log context used for logging
* @param batchSize The size to use when allocating {@link MemoryRecords} instances
* @param compression The compression codec for the records
* @param compressionConfig The compression type/level/buffer size for the records
* @param lingerMs An artificial delay time to add before declaring a records instance that isn't full ready for
* sending. This allows time for more records to arrive. Setting a non-zero lingerMs will trade off some
* latency for potentially better throughput due to more batching (and hence fewer, larger requests).
Expand All @@ -105,7 +105,7 @@ public final class RecordAccumulator {
*/
public RecordAccumulator(LogContext logContext,
int batchSize,
CompressionType compression,
CompressionConfig compressionConfig,
int lingerMs,
long retryBackoffMs,
int deliveryTimeoutMs,
Expand All @@ -121,7 +121,7 @@ public RecordAccumulator(LogContext logContext,
this.flushesInProgress = new AtomicInteger(0);
this.appendsInProgress = new AtomicInteger(0);
this.batchSize = batchSize;
this.compression = compression;
this.compressionConfig = compressionConfig;
this.lingerMs = lingerMs;
this.retryBackoffMs = retryBackoffMs;
this.deliveryTimeoutMs = deliveryTimeoutMs;
Expand Down Expand Up @@ -205,7 +205,7 @@ public RecordAppendResult append(TopicPartition tp,

// we don't have an in-progress record batch try to allocate a new batch
byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compressionConfig.getType(), key, value, headers));
log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
buffer = free.allocate(size, maxTimeToBlock);
synchronized (dq) {
Expand Down Expand Up @@ -242,7 +242,10 @@ private MemoryRecordsBuilder recordsBuilder(ByteBuffer buffer, byte maxUsableMag
throw new UnsupportedVersionException("Attempting to use idempotence with a broker which does not " +
"support the required message format (v2). The broker must be version 0.11 or later.");
}
return MemoryRecords.builder(buffer).magic(maxUsableMagic).compressionType(compression).build();
return MemoryRecords.builder(buffer)
.magic(maxUsableMagic)
.compressionConfig(compressionConfig)
.build();
}

/**
Expand Down Expand Up @@ -340,7 +343,7 @@ public int splitAndReenqueue(ProducerBatch bigBatch) {
// Reset the estimated compression ratio to the initial value or the big batch compression ratio, whichever
// is bigger. There are several different ways to do the reset. We chose the most conservative one to ensure
// the split doesn't happen too often.
CompressionRatioEstimator.setEstimation(bigBatch.topicPartition.topic(), compression,
CompressionRatioEstimator.setEstimation(bigBatch.topicPartition.topic(), compressionConfig.getType(),
Math.max(1.0f, (float) bigBatch.compressionRatio()));
Deque<ProducerBatch> dq = bigBatch.split(this.batchSize);
int numSplitBatches = dq.size();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.record.CompressionConfig;
import org.apache.kafka.common.record.CompressionRatioEstimator;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.DefaultRecord;
Expand Down Expand Up @@ -338,7 +339,7 @@ public void testRetryBackoff() throws Exception {
String metricGrpName = "producer-metrics";

final RecordAccumulator accum = new RecordAccumulator(logContext, batchSize,
CompressionType.NONE, lingerMs, retryBackoffMs, deliveryTimeoutMs, metrics, metricGrpName, time, new ApiVersions(), null,
CompressionConfig.none(), lingerMs, retryBackoffMs, deliveryTimeoutMs, metrics, metricGrpName, time, new ApiVersions(), null,
new BufferPool(totalSize, batchSize, metrics, time, metricGrpName));

long now = time.milliseconds();
Expand Down Expand Up @@ -707,7 +708,7 @@ public void testIdempotenceWithOldMagic() throws InterruptedException {
apiVersions.update("foobar", NodeApiVersions.create(Arrays.asList(new ApiVersionsResponse.ApiVersion(ApiKeys.PRODUCE,
(short) 0, (short) 2))));
RecordAccumulator accum = new RecordAccumulator(logContext, batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD,
CompressionType.NONE, lingerMs, retryBackoffMs, deliveryTimeoutMs, metrics, metricGrpName, time, apiVersions, new TransactionManager(),
CompressionConfig.none(), lingerMs, retryBackoffMs, deliveryTimeoutMs, metrics, metricGrpName, time, apiVersions, new TransactionManager(),
new BufferPool(totalSize, batchSize, metrics, time, metricGrpName));
accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, 0);
}
Expand Down Expand Up @@ -1005,7 +1006,7 @@ private RecordAccumulator createTestRecordAccumulator(int deliveryTimeoutMs, int
return new RecordAccumulator(
logContext,
batchSize,
type,
CompressionConfig.of(type),
lingerMs,
retryBackoffMs,
deliveryTimeoutMs,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.kafka.common.network.NetworkReceive;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.CompressionConfig;
import org.apache.kafka.common.record.CompressionRatioEstimator;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.MemoryRecords;
Expand Down Expand Up @@ -1913,7 +1914,7 @@ private void testSplitBatchAndSend(TransactionManager txnManager,
// Set a good compression ratio.
CompressionRatioEstimator.setEstimation(topic, CompressionType.GZIP, 0.2f);
try (Metrics m = new Metrics()) {
accumulator = new RecordAccumulator(logContext, batchSize, CompressionType.GZIP,
accumulator = new RecordAccumulator(logContext, batchSize, CompressionConfig.of(CompressionType.GZIP),
0, 0L, deliveryTimeoutMs, m, metricGrpName, time, new ApiVersions(), txnManager,
new BufferPool(totalSize, batchSize, metrics, time, "producer-internal-metrics"));
SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
Expand Down Expand Up @@ -2414,7 +2415,7 @@ private void setupWithTransactionState(TransactionManager transactionManager, bo
this.metrics = new Metrics(metricConfig, time);
BufferPool pool = (customPool == null) ? new BufferPool(totalSize, batchSize, metrics, time, metricGrpName) : customPool;

this.accumulator = new RecordAccumulator(logContext, batchSize, CompressionType.NONE, 0, 0L,
this.accumulator = new RecordAccumulator(logContext, batchSize, CompressionConfig.none(), 0, 0L,
deliveryTimeoutMs, metrics, metricGrpName, time, apiVersions, transactionManager, pool);
this.senderMetricsRegistry = new SenderMetricsRegistry(this.metrics);
this.sender = new Sender(logContext, this.client, this.metadata, this.accumulator, guaranteeOrder, MAX_REQUEST_SIZE, ACKS_ALL,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.CompressionConfig;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MemoryRecordsBuilder;
import org.apache.kafka.common.record.MutableRecordBatch;
Expand Down Expand Up @@ -134,7 +134,7 @@ public void setup() {
Metrics metrics = new Metrics(metricConfig, time);
SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(metrics);

this.accumulator = new RecordAccumulator(logContext, batchSize, CompressionType.NONE, 0, 0L,
this.accumulator = new RecordAccumulator(logContext, batchSize, CompressionConfig.none(), 0, 0L,
deliveryTimeoutMs, metrics, metricGrpName, time, apiVersions, transactionManager,
new BufferPool(totalSize, batchSize, metrics, time, metricGrpName));
this.sender = new Sender(logContext, this.client, this.metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL,
Expand Down