From 5e1e6964a6abac81ee6cb0888415bff6593add5d Mon Sep 17 00:00:00 2001 From: yuezhang Date: Tue, 3 Nov 2020 16:51:31 +0800 Subject: [PATCH 01/19] remove build in kafka consumer config : --- .../org/apache/druid/indexing/kafka/KafkaConsumerConfigs.java | 1 - 1 file changed, 1 deletion(-) diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerConfigs.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerConfigs.java index e9c6e79358a3..365be19cc03c 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerConfigs.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerConfigs.java @@ -38,7 +38,6 @@ public static Map getConsumerProperties() props.put("group.id", StringUtils.format("kafka-supervisor-%s", IdUtils.getRandomId())); props.put("auto.offset.reset", "none"); props.put("enable.auto.commit", "false"); - props.put("isolation.level", "read_committed"); return props; } From ac6dd037a910461462ee9c1e9994176c4c7aab4c Mon Sep 17 00:00:00 2001 From: yuezhang Date: Tue, 3 Nov 2020 17:03:30 +0800 Subject: [PATCH 02/19] modify druid docs of kafka indexing service --- docs/development/extensions-core/kafka-ingestion.md | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/docs/development/extensions-core/kafka-ingestion.md b/docs/development/extensions-core/kafka-ingestion.md index 3eb3499fb9e7..b4ef89bfe530 100644 --- a/docs/development/extensions-core/kafka-ingestion.md +++ b/docs/development/extensions-core/kafka-ingestion.md @@ -33,8 +33,9 @@ manage failures, and ensure that the scalability and replication requirements ar This service is provided in the `druid-kafka-indexing-service` core Apache Druid extension (see [Including Extensions](../../development/extensions.md#loading-extensions)). -> The Kafka indexing service supports transactional topics which were introduced in Kafka 0.11.x. These changes make the -> Kafka consumer that Druid uses incompatible with older brokers. Ensure that your Kafka brokers are version 0.11.x or +> The Kafka indexing service supports transactional topics which were introduced in Kafka 0.11.x. +> If you want to use this transaction feature of Kafka, then you need to set "isolation.level": "read_committed" in consumerProperties of Supervisor spec +> And ensure that your Kafka brokers are version 0.11.x or > better before using this functionality. Refer [Kafka upgrade guide](https://kafka.apache.org/documentation/#upgrade) > if you are using older version of Kafka brokers. @@ -104,7 +105,8 @@ A sample supervisor spec is shown below: "type": "json" }, "consumerProperties": { - "bootstrap.servers": "localhost:9092" + "bootstrap.servers": "localhost:9092", + "isolation.level": "read_committed" }, "taskCount": 1, "replicas": 1, @@ -132,7 +134,7 @@ A sample supervisor spec is shown below: |-----|----|-----------|--------| |`topic`|String|The Kafka topic to read from. This must be a specific topic as topic patterns are not supported.|yes| |`inputFormat`|Object|[`inputFormat`](../../ingestion/data-formats.md#input-format) to specify how to parse input data. See [the below section](#specifying-data-format) for details about specifying the input format.|yes| -|`consumerProperties`|Map|A map of properties to be passed to the Kafka consumer. This must contain a property `bootstrap.servers` with a list of Kafka brokers in the form: `:,:,...`. For SSL connections, the `keystore`, `truststore` and `key` passwords can be provided as a [Password Provider](../../operations/password-provider.md) or String password.|yes| +|`consumerProperties`|Map|A map of properties to be passed to the Kafka consumer. This must contain a property `bootstrap.servers` with a list of Kafka brokers in the form: `:,:,...`. And set "isolation.level": "read_committed" when you want to consume transactional topics of kafka.For SSL connections, the `keystore`, `truststore` and `key` passwords can be provided as a [Password Provider](../../operations/password-provider.md) or String password.|yes| |`pollTimeout`|Long|The length of time to wait for the Kafka consumer to poll records, in milliseconds|no (default == 100)| |`replicas`|Integer|The number of replica sets, where 1 means a single set of tasks (no replication). Replica tasks will always be assigned to different workers to provide resiliency against process failure.|no (default == 1)| |`taskCount`|Integer|The maximum number of *reading* tasks in a *replica set*. This means that the maximum number of reading tasks will be `taskCount * replicas` and the total number of tasks (*reading* + *publishing*) will be higher than this. See [Capacity Planning](#capacity-planning) below for more details. The number of reading tasks will be less than `taskCount` if `taskCount > {numKafkaPartitions}`.|no (default == 1)| From 885985a3a7d8494725e3a69f2b995df392a8214d Mon Sep 17 00:00:00 2001 From: yuezhang Date: Thu, 12 Nov 2020 11:05:54 +0800 Subject: [PATCH 03/19] yuezhang --- .../extensions-core/kafka-ingestion.md | 1 + .../indexing/kafka/KafkaConsumerConfigs.java | 5 ++- .../druid/indexing/kafka/KafkaIndexTask.java | 4 +- .../kafka/KafkaIndexTaskIOConfig.java | 13 ++++++ .../indexing/kafka/KafkaRecordSupplier.java | 10 +++-- .../indexing/kafka/KafkaSamplerSpec.java | 2 +- .../kafka/supervisor/KafkaSupervisor.java | 4 +- .../supervisor/KafkaSupervisorIOConfig.java | 11 +++++ .../indexing/kafka/KafkaIOConfigTest.java | 2 + .../indexing/kafka/KafkaIndexTaskTest.java | 41 +++++++++++++++++++ .../kafka/KafkaRecordSupplierTest.java | 28 +++++++------ .../indexing/kafka/KafkaSamplerSpecTest.java | 1 + .../KafkaSupervisorIOConfigTest.java | 1 + .../kafka/supervisor/KafkaSupervisorTest.java | 11 ++++- .../druid/indexing/kafka/test/TestBroker.java | 2 +- .../AbstractKafkaIndexingServiceTest.java | 2 +- 16 files changed, 112 insertions(+), 26 deletions(-) diff --git a/docs/development/extensions-core/kafka-ingestion.md b/docs/development/extensions-core/kafka-ingestion.md index b4ef89bfe530..6b19e0791ea2 100644 --- a/docs/development/extensions-core/kafka-ingestion.md +++ b/docs/development/extensions-core/kafka-ingestion.md @@ -146,6 +146,7 @@ A sample supervisor spec is shown below: |`lateMessageRejectionStartDateTime`|ISO8601 DateTime|Configure tasks to reject messages with timestamps earlier than this date time; for example if this is set to `2016-01-01T11:00Z` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z* will be dropped. This may help prevent concurrency issues if your data stream has late messages and you have multiple pipelines that need to operate on the same segments (e.g. a realtime and a nightly batch ingestion pipeline).|no (default == none)| |`lateMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject messages with timestamps earlier than this period before the task was created; for example if this is set to `PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z* will be dropped. This may help prevent concurrency issues if your data stream has late messages and you have multiple pipelines that need to operate on the same segments (e.g. a realtime and a nightly batch ingestion pipeline). Please note that only one of `lateMessageRejectionPeriod` or `lateMessageRejectionStartDateTime` can be specified.|no (default == none)| |`earlyMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject messages with timestamps later than this period after the task reached its taskDuration; for example if this is set to `PT1H`, the taskDuration is set to `PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with timestamps later than *2016-01-01T14:00Z* will be dropped. **Note:** Tasks sometimes run past their task duration, for example, in cases of supervisor failover. Setting earlyMessageRejectionPeriod too low may cause messages to be dropped unexpectedly whenever a task runs past its originally configured task duration.|no (default == none)| +|`consumeTransactionally`|Boolean|Set `consumeTransactionally` false here can disable druid to consume Kafka in a transactional way. And druid could consume lower version of Kafka now, such as 0.10.2.1 |no (default == true)| #### Specifying data format diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerConfigs.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerConfigs.java index 365be19cc03c..fef904d7c896 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerConfigs.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerConfigs.java @@ -31,13 +31,16 @@ public class KafkaConsumerConfigs { - public static Map getConsumerProperties() + public static Map getConsumerProperties(Boolean consumeTransactionally) { final Map props = new HashMap<>(); props.put("metadata.max.age.ms", "10000"); props.put("group.id", StringUtils.format("kafka-supervisor-%s", IdUtils.getRandomId())); props.put("auto.offset.reset", "none"); props.put("enable.auto.commit", "false"); + if (consumeTransactionally) { + props.put("isolation.level", "read_committed"); + } return props; } diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java index fb063532356d..ef1ba6b57b14 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java @@ -90,7 +90,7 @@ KafkaConsumer newConsumer() try { Thread.currentThread().setContextClassLoader(getClass().getClassLoader()); - final Map consumerConfigs = KafkaConsumerConfigs.getConsumerProperties(); + final Map consumerConfigs = KafkaConsumerConfigs.getConsumerProperties(ioConfig.getConsumeTransactionally()); final Properties props = new Properties(); KafkaRecordSupplier.addConsumerPropertiesFromConfig( props, @@ -143,7 +143,7 @@ protected KafkaRecordSupplier newTaskRecordSupplier() props.put("auto.offset.reset", "none"); - return new KafkaRecordSupplier(props, configMapper); + return new KafkaRecordSupplier(props, configMapper, ioConfig.getConsumeTransactionally()); } finally { Thread.currentThread().setContextClassLoader(currCtxCl); diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskIOConfig.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskIOConfig.java index ece3fa5ba82b..1cbec059a0d2 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskIOConfig.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskIOConfig.java @@ -30,12 +30,14 @@ import org.joda.time.DateTime; import javax.annotation.Nullable; + import java.util.Map; public class KafkaIndexTaskIOConfig extends SeekableStreamIndexTaskIOConfig { private final Map consumerProperties; private final long pollTimeout; + private final boolean consumeTransactionally; @JsonCreator public KafkaIndexTaskIOConfig( @@ -52,6 +54,7 @@ public KafkaIndexTaskIOConfig( @JsonProperty("endSequenceNumbers") @Nullable SeekableStreamEndSequenceNumbers endSequenceNumbers, @JsonProperty("consumerProperties") Map consumerProperties, + @JsonProperty("consumeTransactionally") Boolean consumeTransactionally, @JsonProperty("pollTimeout") Long pollTimeout, @JsonProperty("useTransaction") Boolean useTransaction, @JsonProperty("minimumMessageTime") DateTime minimumMessageTime, @@ -74,6 +77,7 @@ public KafkaIndexTaskIOConfig( this.consumerProperties = Preconditions.checkNotNull(consumerProperties, "consumerProperties"); this.pollTimeout = pollTimeout != null ? pollTimeout : KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS; + this.consumeTransactionally = consumeTransactionally != null ? consumeTransactionally : KafkaSupervisorIOConfig.DEFAULT_CONSUME_TRANSACTIONALLY; final SeekableStreamEndSequenceNumbers myEndSequenceNumbers = getEndSequenceNumbers(); for (int partition : myEndSequenceNumbers.getPartitionSequenceNumberMap().keySet()) { @@ -93,6 +97,7 @@ public KafkaIndexTaskIOConfig( SeekableStreamStartSequenceNumbers startSequenceNumbers, SeekableStreamEndSequenceNumbers endSequenceNumbers, Map consumerProperties, + Boolean consumeTransactionally, Long pollTimeout, Boolean useTransaction, DateTime minimumMessageTime, @@ -108,6 +113,7 @@ public KafkaIndexTaskIOConfig( startSequenceNumbers, endSequenceNumbers, consumerProperties, + consumeTransactionally, pollTimeout, useTransaction, minimumMessageTime, @@ -156,6 +162,12 @@ public long getPollTimeout() return pollTimeout; } + @JsonProperty + public Boolean getConsumeTransactionally() + { + return consumeTransactionally; + } + @Override public String toString() { @@ -165,6 +177,7 @@ public String toString() ", startSequenceNumbers=" + getStartSequenceNumbers() + ", endSequenceNumbers=" + getEndSequenceNumbers() + ", consumerProperties=" + consumerProperties + + ", consumeTransactionally=" + consumeTransactionally + ", pollTimeout=" + pollTimeout + ", useTransaction=" + isUseTransaction() + ", minimumMessageTime=" + getMinimumMessageTime() + diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java index 3ffe7fa83ec0..9ad188f133cb 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java @@ -37,6 +37,7 @@ import org.apache.kafka.common.serialization.Deserializer; import javax.annotation.Nonnull; + import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.lang.reflect.Type; @@ -57,10 +58,11 @@ public class KafkaRecordSupplier implements RecordSupplier public KafkaRecordSupplier( Map consumerProperties, - ObjectMapper sortingMapper + ObjectMapper sortingMapper, + Boolean consumeTransactionally ) { - this(getKafkaConsumer(sortingMapper, consumerProperties)); + this(getKafkaConsumer(sortingMapper, consumerProperties, consumeTransactionally)); } @VisibleForTesting @@ -232,9 +234,9 @@ private static Deserializer getKafkaDeserializer(Properties properties, String k return deserializerObject; } - private static KafkaConsumer getKafkaConsumer(ObjectMapper sortingMapper, Map consumerProperties) + private static KafkaConsumer getKafkaConsumer(ObjectMapper sortingMapper, Map consumerProperties, Boolean consumeTransactionally) { - final Map consumerConfigs = KafkaConsumerConfigs.getConsumerProperties(); + final Map consumerConfigs = KafkaConsumerConfigs.getConsumerProperties(consumeTransactionally); final Properties props = new Properties(); addConsumerPropertiesFromConfig(props, sortingMapper, consumerProperties); props.putAll(consumerConfigs); diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaSamplerSpec.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaSamplerSpec.java index 3981728cfc7d..5a1763b8c31b 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaSamplerSpec.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaSamplerSpec.java @@ -63,7 +63,7 @@ protected KafkaRecordSupplier createRecordSupplier() props.put("auto.offset.reset", "none"); props.put("request.timeout.ms", Integer.toString(samplerConfig.getTimeoutMs())); - return new KafkaRecordSupplier(props, objectMapper); + return new KafkaRecordSupplier(props, objectMapper, ((KafkaSupervisorIOConfig) ioConfig).getConsumeTransactionally()); } finally { Thread.currentThread().setContextClassLoader(currCtxCl); diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java index 021d91664f1b..a8218ce550ef 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java @@ -123,7 +123,8 @@ public KafkaSupervisor( @Override protected RecordSupplier setupRecordSupplier() { - return new KafkaRecordSupplier(spec.getIoConfig().getConsumerProperties(), sortingMapper); + KafkaSupervisorIOConfig ioConfig = spec.getIoConfig(); + return new KafkaRecordSupplier(ioConfig.getConsumerProperties(), sortingMapper, ioConfig.getConsumeTransactionally()); } @Override @@ -190,6 +191,7 @@ protected SeekableStreamIndexTaskIOConfig createTaskIoConfig( new SeekableStreamStartSequenceNumbers<>(kafkaIoConfig.getTopic(), startPartitions, Collections.emptySet()), new SeekableStreamEndSequenceNumbers<>(kafkaIoConfig.getTopic(), endPartitions), kafkaIoConfig.getConsumerProperties(), + kafkaIoConfig.getConsumeTransactionally(), kafkaIoConfig.getPollTimeout(), true, minimumMessageTime, diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java index a1360b5c63f3..5ec5bf212eec 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java @@ -37,9 +37,11 @@ public class KafkaSupervisorIOConfig extends SeekableStreamSupervisorIOConfig public static final String KEY_STORE_PASSWORD_KEY = "ssl.keystore.password"; public static final String KEY_PASSWORD_KEY = "ssl.key.password"; public static final long DEFAULT_POLL_TIMEOUT_MILLIS = 100; + public static final boolean DEFAULT_CONSUME_TRANSACTIONALLY = true; private final Map consumerProperties; private final long pollTimeout; + private final boolean consumeTransactionally; @JsonCreator @@ -50,6 +52,7 @@ public KafkaSupervisorIOConfig( @JsonProperty("taskCount") Integer taskCount, @JsonProperty("taskDuration") Period taskDuration, @JsonProperty("consumerProperties") Map consumerProperties, + @JsonProperty("consumeTransactionally") Boolean consumeTransactionally, @JsonProperty("pollTimeout") Long pollTimeout, @JsonProperty("startDelay") Period startDelay, @JsonProperty("period") Period period, @@ -81,6 +84,7 @@ public KafkaSupervisorIOConfig( StringUtils.format("consumerProperties must contain entry for [%s]", BOOTSTRAP_SERVERS_KEY) ); this.pollTimeout = pollTimeout != null ? pollTimeout : DEFAULT_POLL_TIMEOUT_MILLIS; + this.consumeTransactionally = consumeTransactionally != null ? consumeTransactionally : DEFAULT_CONSUME_TRANSACTIONALLY; } @JsonProperty @@ -95,6 +99,12 @@ public Map getConsumerProperties() return consumerProperties; } + @JsonProperty + public Boolean getConsumeTransactionally() + { + return consumeTransactionally; + } + @JsonProperty public long getPollTimeout() { @@ -116,6 +126,7 @@ public String toString() ", taskCount=" + getTaskCount() + ", taskDuration=" + getTaskDuration() + ", consumerProperties=" + consumerProperties + + ", consumeTransactionally=" + consumeTransactionally + ", pollTimeout=" + pollTimeout + ", startDelay=" + getStartDelay() + ", period=" + getPeriod() + diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIOConfigTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIOConfigTest.java index 4de3c06e78ae..fc2cb7097db3 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIOConfigTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIOConfigTest.java @@ -313,6 +313,7 @@ public void testDeserializeToOldIoConfig() throws IOException new SeekableStreamStartSequenceNumbers<>("stream", ImmutableMap.of(1, 10L, 2, 5L), null), new SeekableStreamEndSequenceNumbers<>("stream", ImmutableMap.of(1, 20L, 2, 30L)), ImmutableMap.of("consumer", "properties"), + null, 100L, true, DateTimes.nowUtc(), @@ -325,6 +326,7 @@ public void testDeserializeToOldIoConfig() throws IOException oldMapper.registerSubtypes(new NamedType(OldKafkaIndexTaskIoConfig.class, "kafka")); final OldKafkaIndexTaskIoConfig oldConfig = (OldKafkaIndexTaskIoConfig) oldMapper.readValue(json, IOConfig.class); + Assert.assertEquals(true, currentConfig.getConsumeTransactionally()); Assert.assertEquals(currentConfig.getTaskGroupId().intValue(), oldConfig.taskGroupId); Assert.assertEquals(currentConfig.getBaseSequenceName(), oldConfig.baseSequenceName); Assert.assertEquals(currentConfig.getStartSequenceNumbers(), oldConfig.startPartitions.asStartPartitions(true)); diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java index 9b5373522803..f6b554015af6 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -137,6 +137,7 @@ import org.junit.runners.Parameterized; import javax.annotation.Nullable; + import java.io.File; import java.io.IOException; import java.util.Arrays; @@ -341,6 +342,7 @@ public void testRunAfterDataInserted() throws Exception new SeekableStreamStartSequenceNumbers<>(topic, ImmutableMap.of(0, 2L), ImmutableSet.of()), new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(0, 5L)), kafkaServer.consumerProperties(), + null, KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS, true, null, @@ -389,6 +391,7 @@ public void testRunAfterDataInsertedWithLegacyParser() throws Exception new SeekableStreamStartSequenceNumbers<>(topic, ImmutableMap.of(0, 2L), ImmutableSet.of()), new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(0, 5L)), kafkaServer.consumerProperties(), + null, KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS, true, null, @@ -432,6 +435,7 @@ public void testRunBeforeDataInserted() throws Exception new SeekableStreamStartSequenceNumbers<>(topic, ImmutableMap.of(0, 2L), ImmutableSet.of()), new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(0, 5L)), kafkaServer.consumerProperties(), + null, KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS, true, null, @@ -485,6 +489,7 @@ public void testRunAfterDataInsertedLiveReport() throws Exception new SeekableStreamStartSequenceNumbers<>(topic, ImmutableMap.of(0, 2L), ImmutableSet.of()), new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(0, 12L)), kafkaServer.consumerProperties(), + null, KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS, true, null, @@ -563,6 +568,7 @@ public void testIncrementalHandOff() throws Exception startPartitions, endPartitions, consumerProps, + null, KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS, true, null, @@ -667,6 +673,7 @@ public void testIncrementalHandOffMaxTotalRows() throws Exception startPartitions, endPartitions, consumerProps, + null, KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS, true, null, @@ -794,6 +801,7 @@ public void testTimeBasedIncrementalHandOff() throws Exception startPartitions, endPartitions, consumerProps, + null, KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS, true, null, @@ -880,6 +888,7 @@ public void testIncrementalHandOffReadsThroughEndOffsets() throws Exception startPartitions, endPartitions, consumerProps, + null, KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS, true, null, @@ -895,6 +904,7 @@ public void testIncrementalHandOffReadsThroughEndOffsets() throws Exception startPartitions, endPartitions, consumerProps, + null, KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS, true, null, @@ -963,6 +973,7 @@ public void testRunWithMinimumMessageTime() throws Exception new SeekableStreamStartSequenceNumbers<>(topic, ImmutableMap.of(0, 0L), ImmutableSet.of()), new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(0, 5L)), kafkaServer.consumerProperties(), + null, KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS, true, DateTimes.of("2010"), @@ -1014,6 +1025,7 @@ public void testRunWithMaximumMessageTime() throws Exception new SeekableStreamStartSequenceNumbers<>(topic, ImmutableMap.of(0, 0L), ImmutableSet.of()), new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(0, 5L)), kafkaServer.consumerProperties(), + null, KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS, true, null, @@ -1074,6 +1086,7 @@ public void testRunWithTransformSpec() throws Exception new SeekableStreamStartSequenceNumbers<>(topic, ImmutableMap.of(0, 0L), ImmutableSet.of()), new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(0, 5L)), kafkaServer.consumerProperties(), + null, KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS, true, null, @@ -1127,6 +1140,7 @@ public void testRunOnNothing() throws Exception new SeekableStreamStartSequenceNumbers<>(topic, ImmutableMap.of(0, 2L), ImmutableSet.of()), new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(0, 2L)), kafkaServer.consumerProperties(), + null, KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS, true, null, @@ -1165,6 +1179,7 @@ public void testHandoffConditionTimeoutWhenHandoffOccurs() throws Exception new SeekableStreamStartSequenceNumbers<>(topic, ImmutableMap.of(0, 2L), ImmutableSet.of()), new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(0, 5L)), kafkaServer.consumerProperties(), + null, KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS, true, null, @@ -1214,6 +1229,7 @@ public void testHandoffConditionTimeoutWhenHandoffDoesNotOccur() throws Exceptio new SeekableStreamStartSequenceNumbers<>(topic, ImmutableMap.of(0, 2L), ImmutableSet.of()), new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(0, 5L)), kafkaServer.consumerProperties(), + null, KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS, true, null, @@ -1268,6 +1284,7 @@ public void testReportParseExceptions() throws Exception new SeekableStreamStartSequenceNumbers<>(topic, ImmutableMap.of(0, 2L), ImmutableSet.of()), new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(0, 7L)), kafkaServer.consumerProperties(), + null, KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS, true, null, @@ -1309,6 +1326,7 @@ public void testMultipleParseExceptionsSuccess() throws Exception new SeekableStreamStartSequenceNumbers<>(topic, ImmutableMap.of(0, 2L), ImmutableSet.of()), new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(0, 13L)), kafkaServer.consumerProperties(), + null, KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS, true, null, @@ -1387,6 +1405,7 @@ public void testMultipleParseExceptionsFailure() throws Exception new SeekableStreamStartSequenceNumbers<>(topic, ImmutableMap.of(0, 2L), ImmutableSet.of()), new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(0, 10L)), kafkaServer.consumerProperties(), + null, KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS, true, null, @@ -1448,6 +1467,7 @@ public void testRunReplicas() throws Exception new SeekableStreamStartSequenceNumbers<>(topic, ImmutableMap.of(0, 2L), ImmutableSet.of()), new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(0, 5L)), kafkaServer.consumerProperties(), + null, KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS, true, null, @@ -1463,6 +1483,7 @@ public void testRunReplicas() throws Exception new SeekableStreamStartSequenceNumbers<>(topic, ImmutableMap.of(0, 2L), ImmutableSet.of()), new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(0, 5L)), kafkaServer.consumerProperties(), + null, KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS, true, null, @@ -1514,6 +1535,7 @@ public void testRunConflicting() throws Exception new SeekableStreamStartSequenceNumbers<>(topic, ImmutableMap.of(0, 2L), ImmutableSet.of()), new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(0, 5L)), kafkaServer.consumerProperties(), + null, KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS, true, null, @@ -1529,6 +1551,7 @@ public void testRunConflicting() throws Exception new SeekableStreamStartSequenceNumbers<>(topic, ImmutableMap.of(0, 3L), ImmutableSet.of()), new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(0, 10L)), kafkaServer.consumerProperties(), + null, KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS, true, null, @@ -1582,6 +1605,7 @@ public void testRunConflictingWithoutTransactions() throws Exception new SeekableStreamStartSequenceNumbers<>(topic, ImmutableMap.of(0, 2L), ImmutableSet.of()), new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(0, 5L)), kafkaServer.consumerProperties(), + null, KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS, false, null, @@ -1597,6 +1621,7 @@ public void testRunConflictingWithoutTransactions() throws Exception new SeekableStreamStartSequenceNumbers<>(topic, ImmutableMap.of(0, 3L), ImmutableSet.of()), new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(0, 10L)), kafkaServer.consumerProperties(), + null, KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS, false, null, @@ -1648,6 +1673,7 @@ public void testRunOneTaskTwoPartitions() throws Exception new SeekableStreamStartSequenceNumbers<>(topic, ImmutableMap.of(0, 2L, 1, 0L), ImmutableSet.of()), new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(0, 5L, 1, 2L)), kafkaServer.consumerProperties(), + null, KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS, true, null, @@ -1696,6 +1722,7 @@ public void testRunTwoTasksTwoPartitions() throws Exception new SeekableStreamStartSequenceNumbers<>(topic, ImmutableMap.of(0, 2L), ImmutableSet.of()), new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(0, 5L)), kafkaServer.consumerProperties(), + null, KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS, true, null, @@ -1711,6 +1738,7 @@ public void testRunTwoTasksTwoPartitions() throws Exception new SeekableStreamStartSequenceNumbers<>(topic, ImmutableMap.of(1, 0L), ImmutableSet.of()), new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(1, 1L)), kafkaServer.consumerProperties(), + null, KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS, true, null, @@ -1765,6 +1793,7 @@ public void testRestore() throws Exception new SeekableStreamStartSequenceNumbers<>(topic, ImmutableMap.of(0, 2L), ImmutableSet.of()), new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(0, 6L)), kafkaServer.consumerProperties(), + null, KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS, true, null, @@ -1806,6 +1835,7 @@ public void testRestore() throws Exception new SeekableStreamStartSequenceNumbers<>(topic, ImmutableMap.of(0, 2L), ImmutableSet.of()), new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(0, 6L)), kafkaServer.consumerProperties(), + null, KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS, true, null, @@ -1868,6 +1898,7 @@ public void testRestoreAfterPersistingSequences() throws Exception new SeekableStreamStartSequenceNumbers<>(topic, ImmutableMap.of(0, 0L), ImmutableSet.of()), new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(0, 10L)), consumerProps, + null, KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS, true, null, @@ -1917,6 +1948,7 @@ public void testRestoreAfterPersistingSequences() throws Exception new SeekableStreamStartSequenceNumbers<>(topic, ImmutableMap.of(0, 0L), ImmutableSet.of()), new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(0, 10L)), consumerProps, + null, KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS, true, null, @@ -1979,6 +2011,7 @@ public void testRunWithPauseAndResume() throws Exception new SeekableStreamStartSequenceNumbers<>(topic, ImmutableMap.of(0, 2L), ImmutableSet.of()), new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(0, 6L)), kafkaServer.consumerProperties(), + null, KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS, true, null, @@ -2069,6 +2102,7 @@ public void testRunWithOffsetOutOfRangeExceptionAndPause() throws Exception new SeekableStreamStartSequenceNumbers<>(topic, ImmutableMap.of(0, 2L), ImmutableSet.of()), new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(0, 5L)), kafkaServer.consumerProperties(), + null, KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS, true, null, @@ -2105,6 +2139,7 @@ public void testRunWithOffsetOutOfRangeExceptionAndNextOffsetGreaterThanLeastAva new SeekableStreamStartSequenceNumbers<>(topic, ImmutableMap.of(0, 200L), ImmutableSet.of()), new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(0, 500L)), kafkaServer.consumerProperties(), + KafkaSupervisorIOConfig.DEFAULT_CONSUME_TRANSACTIONALLY, KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS, true, null, @@ -2151,6 +2186,7 @@ public void testRunContextSequenceAheadOfStartingOffsets() throws Exception new SeekableStreamStartSequenceNumbers<>(topic, ImmutableMap.of(0, 0L), ImmutableSet.of()), new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(0, 5L)), kafkaServer.consumerProperties(), + null, KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS, true, null, @@ -2198,6 +2234,7 @@ public void testRunWithDuplicateRequest() throws Exception new SeekableStreamStartSequenceNumbers<>(topic, ImmutableMap.of(0, 200L), ImmutableSet.of()), new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(0, 500L)), kafkaServer.consumerProperties(), + null, KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS, true, null, @@ -2234,6 +2271,7 @@ public void testRunTransactionModeRollback() throws Exception new SeekableStreamStartSequenceNumbers<>(topic, ImmutableMap.of(0, 0L), ImmutableSet.of()), new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(0, 13L)), kafkaServer.consumerProperties(), + null, KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS, true, null, @@ -2360,6 +2398,7 @@ public void testCanStartFromLaterThanEarliestOffset() throws Exception startPartitions, endPartitions, consumerProps, + null, KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS, true, null, @@ -2382,6 +2421,7 @@ public void testRunWithoutDataInserted() throws Exception new SeekableStreamStartSequenceNumbers<>(topic, ImmutableMap.of(0, 2L), ImmutableSet.of()), new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(0, 5L)), kafkaServer.consumerProperties(), + null, KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS, true, null, @@ -2431,6 +2471,7 @@ public void testSerde() throws Exception new SeekableStreamStartSequenceNumbers<>(topic, ImmutableMap.of(), ImmutableSet.of()), new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of()), ImmutableMap.of(), + null, KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS, true, null, diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java index d15803352321..c527f07d5f36 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java @@ -194,7 +194,7 @@ public void testSupplierSetup() throws ExecutionException, InterruptedException ); KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier( - kafkaServer.consumerProperties(), OBJECT_MAPPER); + kafkaServer.consumerProperties(), OBJECT_MAPPER, true); Assert.assertTrue(recordSupplier.getAssignment().isEmpty()); @@ -224,7 +224,8 @@ public void testSupplierSetupCustomDeserializer() throws ExecutionException, Int KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier( properties, - OBJECT_MAPPER + OBJECT_MAPPER, + true ); Assert.assertTrue(recordSupplier.getAssignment().isEmpty()); @@ -255,7 +256,8 @@ public void testPollCustomDeserializer() throws InterruptedException, ExecutionE KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier( properties, - OBJECT_MAPPER + OBJECT_MAPPER, + true ); recordSupplier.assign(partitions); @@ -289,7 +291,7 @@ public void testPoll() throws InterruptedException, ExecutionException ); KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier( - kafkaServer.consumerProperties(), OBJECT_MAPPER); + kafkaServer.consumerProperties(), OBJECT_MAPPER, true); recordSupplier.assign(partitions); recordSupplier.seekToEarliest(partitions); @@ -330,7 +332,7 @@ public void testPollAfterMoreDataAdded() throws InterruptedException, ExecutionE KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier( - kafkaServer.consumerProperties(), OBJECT_MAPPER); + kafkaServer.consumerProperties(), OBJECT_MAPPER, true); recordSupplier.assign(partitions); recordSupplier.seekToEarliest(partitions); @@ -401,7 +403,7 @@ public void testSeek() throws InterruptedException, ExecutionException ); KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier( - kafkaServer.consumerProperties(), OBJECT_MAPPER); + kafkaServer.consumerProperties(), OBJECT_MAPPER, true); recordSupplier.assign(partitions); recordSupplier.seekToEarliest(partitions); @@ -444,7 +446,7 @@ public void testSeekToLatest() throws InterruptedException, ExecutionException ); KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier( - kafkaServer.consumerProperties(), OBJECT_MAPPER); + kafkaServer.consumerProperties(), OBJECT_MAPPER, true); recordSupplier.assign(partitions); recordSupplier.seekToEarliest(partitions); @@ -477,7 +479,7 @@ public void testSeekUnassigned() throws InterruptedException, ExecutionException ); KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier( - kafkaServer.consumerProperties(), OBJECT_MAPPER); + kafkaServer.consumerProperties(), OBJECT_MAPPER, true); recordSupplier.assign(partitions); @@ -503,7 +505,7 @@ public void testPosition() throws ExecutionException, InterruptedException ); KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier( - kafkaServer.consumerProperties(), OBJECT_MAPPER); + kafkaServer.consumerProperties(), OBJECT_MAPPER, true); recordSupplier.assign(partitions); recordSupplier.seekToEarliest(partitions); @@ -538,7 +540,7 @@ public void testPosition() throws ExecutionException, InterruptedException public void getLatestSequenceNumberWhenPartitionIsEmptyAndUseEarliestOffsetShouldReturnsValidNonNull() { KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier( - kafkaServer.consumerProperties(), OBJECT_MAPPER); + kafkaServer.consumerProperties(), OBJECT_MAPPER, true); StreamPartition streamPartition = StreamPartition.of(topic, 0); Set> partitions = ImmutableSet.of(streamPartition); recordSupplier.assign(partitions); @@ -550,7 +552,7 @@ public void getLatestSequenceNumberWhenPartitionIsEmptyAndUseEarliestOffsetShoul public void getEarliestSequenceNumberWhenPartitionIsEmptyAndUseEarliestOffsetShouldReturnsValidNonNull() { KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier( - kafkaServer.consumerProperties(), OBJECT_MAPPER); + kafkaServer.consumerProperties(), OBJECT_MAPPER, true); StreamPartition streamPartition = StreamPartition.of(topic, 0); Set> partitions = ImmutableSet.of(streamPartition); recordSupplier.assign(partitions); @@ -562,7 +564,7 @@ public void getEarliestSequenceNumberWhenPartitionIsEmptyAndUseEarliestOffsetSho public void getLatestSequenceNumberWhenPartitionIsEmptyAndUseLatestOffsetShouldReturnsValidNonNull() { KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier( - kafkaServer.consumerProperties(), OBJECT_MAPPER); + kafkaServer.consumerProperties(), OBJECT_MAPPER, true); StreamPartition streamPartition = StreamPartition.of(topic, 0); Set> partitions = ImmutableSet.of(streamPartition); recordSupplier.assign(partitions); @@ -574,7 +576,7 @@ public void getLatestSequenceNumberWhenPartitionIsEmptyAndUseLatestOffsetShouldR public void getEarliestSequenceNumberWhenPartitionIsEmptyAndUseLatestOffsetShouldReturnsValidNonNull() { KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier( - kafkaServer.consumerProperties(), OBJECT_MAPPER); + kafkaServer.consumerProperties(), OBJECT_MAPPER, true); StreamPartition streamPartition = StreamPartition.of(topic, 0); Set> partitions = ImmutableSet.of(streamPartition); recordSupplier.assign(partitions); diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java index 3768cebd62da..22553dec571d 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java @@ -134,6 +134,7 @@ public void testSample() null, null, null, + null, true, null, null, diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java index 50866890b0f1..79a6e2f45a81 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java @@ -80,6 +80,7 @@ public void testSerdeWithDefaults() throws Exception Assert.assertFalse("lateMessageRejectionPeriod", config.getLateMessageRejectionPeriod().isPresent()); Assert.assertFalse("earlyMessageRejectionPeriod", config.getEarlyMessageRejectionPeriod().isPresent()); Assert.assertFalse("lateMessageRejectionStartDateTime", config.getLateMessageRejectionStartDateTime().isPresent()); + Assert.assertEquals(true, config.getConsumeTransactionally()); } @Test diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index 251d8376a690..2619d43df15e 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -246,6 +246,7 @@ public void testCreateBaseTaskContexts() throws JsonProcessingException new SeekableStreamStartSequenceNumbers<>("test", Collections.emptyMap(), Collections.emptySet()), new SeekableStreamEndSequenceNumbers<>("test", Collections.emptyMap()), Collections.emptyMap(), + true, null, null, null, @@ -2445,7 +2446,9 @@ public void testCheckpointForInactiveTaskGroup() final TaskLocation location2 = new TaskLocation("testHost2", 145, -1); Collection workItems = new ArrayList<>(); workItems.add(new TestTaskRunnerWorkItem(id1, null, location1)); + workItems.add(new TestTaskRunnerWorkItem(id2, null, location2)); + workItems.add(new TestTaskRunnerWorkItem(id2, null, location2)); EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes(); @@ -3365,7 +3368,7 @@ private TestableKafkaSupervisor getTestableSupervisor( String kafkaHost ) { - final Map consumerProperties = KafkaConsumerConfigs.getConsumerProperties(); + final Map consumerProperties = KafkaConsumerConfigs.getConsumerProperties(true); consumerProperties.put("myCustomKey", "myCustomValue"); consumerProperties.put("bootstrap.servers", kafkaHost); KafkaSupervisorIOConfig kafkaSupervisorIOConfig = new KafkaSupervisorIOConfig( @@ -3375,6 +3378,7 @@ private TestableKafkaSupervisor getTestableSupervisor( taskCount, new Period(duration), consumerProperties, + null, KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS, new Period("P1D"), new Period("PT30S"), @@ -3486,6 +3490,7 @@ private TestableKafkaSupervisor getTestableSupervisorCustomIsTaskCurrent( taskCount, new Period(duration), consumerProperties, + null, KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS, new Period("P1D"), new Period("PT30S"), @@ -3601,6 +3606,7 @@ private KafkaSupervisor getSupervisor( taskCount, new Period(duration), consumerProperties, + null, KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS, new Period("P1D"), new Period("PT30S"), @@ -3751,6 +3757,7 @@ private KafkaIndexTask createKafkaIndexTask( startPartitions, endPartitions, ImmutableMap.of("bootstrap.servers", kafkaHost), + null, KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS, true, minimumMessageTime, @@ -3824,7 +3831,7 @@ public TestableKafkaSupervisor( @Override protected RecordSupplier setupRecordSupplier() { - final Map consumerConfigs = KafkaConsumerConfigs.getConsumerProperties(); + final Map consumerConfigs = KafkaConsumerConfigs.getConsumerProperties(true); consumerConfigs.put("metadata.max.age.ms", "1"); final Properties props = new Properties(); KafkaRecordSupplier.addConsumerPropertiesFromConfig(props, sortingMapper, consumerProperties); diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestBroker.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestBroker.java index 9d3f7c500c44..dbfed15045c9 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestBroker.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestBroker.java @@ -140,7 +140,7 @@ void commonClientProperties(Map props) public Map consumerProperties() { - final Map props = KafkaConsumerConfigs.getConsumerProperties(); + final Map props = KafkaConsumerConfigs.getConsumerProperties(true); props.put("bootstrap.servers", StringUtils.format("localhost:%d", getPort())); return props; } diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexingServiceTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexingServiceTest.java index 204b6ef7259e..3056468e5805 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexingServiceTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexingServiceTest.java @@ -56,7 +56,7 @@ Function generateStreamIngestionPropsTransform( IntegrationTestingConfig config ) { - final Map consumerConfigs = KafkaConsumerConfigs.getConsumerProperties(); + final Map consumerConfigs = KafkaConsumerConfigs.getConsumerProperties(true); final Properties consumerProperties = new Properties(); consumerProperties.putAll(consumerConfigs); consumerProperties.setProperty("bootstrap.servers", config.getKafkaInternalHost()); From 1b4d4d86ff3833058c2609b2f767e6facf1480e0 Mon Sep 17 00:00:00 2001 From: yuezhang Date: Thu, 12 Nov 2020 13:01:02 +0800 Subject: [PATCH 04/19] modify doc --- docs/development/extensions-core/kafka-ingestion.md | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/docs/development/extensions-core/kafka-ingestion.md b/docs/development/extensions-core/kafka-ingestion.md index 6b19e0791ea2..4f1d36c72f2c 100644 --- a/docs/development/extensions-core/kafka-ingestion.md +++ b/docs/development/extensions-core/kafka-ingestion.md @@ -33,9 +33,8 @@ manage failures, and ensure that the scalability and replication requirements ar This service is provided in the `druid-kafka-indexing-service` core Apache Druid extension (see [Including Extensions](../../development/extensions.md#loading-extensions)). -> The Kafka indexing service supports transactional topics which were introduced in Kafka 0.11.x. -> If you want to use this transaction feature of Kafka, then you need to set "isolation.level": "read_committed" in consumerProperties of Supervisor spec -> And ensure that your Kafka brokers are version 0.11.x or +> The Kafka indexing service supports transactional topics which were introduced in Kafka 0.11.x. These changes make the +> Kafka consumer that Druid uses incompatible with older brokers. Ensure that your Kafka brokers are version 0.11.x or > better before using this functionality. Refer [Kafka upgrade guide](https://kafka.apache.org/documentation/#upgrade) > if you are using older version of Kafka brokers. @@ -105,8 +104,7 @@ A sample supervisor spec is shown below: "type": "json" }, "consumerProperties": { - "bootstrap.servers": "localhost:9092", - "isolation.level": "read_committed" + "bootstrap.servers": "localhost:9092" }, "taskCount": 1, "replicas": 1, @@ -134,7 +132,7 @@ A sample supervisor spec is shown below: |-----|----|-----------|--------| |`topic`|String|The Kafka topic to read from. This must be a specific topic as topic patterns are not supported.|yes| |`inputFormat`|Object|[`inputFormat`](../../ingestion/data-formats.md#input-format) to specify how to parse input data. See [the below section](#specifying-data-format) for details about specifying the input format.|yes| -|`consumerProperties`|Map|A map of properties to be passed to the Kafka consumer. This must contain a property `bootstrap.servers` with a list of Kafka brokers in the form: `:,:,...`. And set "isolation.level": "read_committed" when you want to consume transactional topics of kafka.For SSL connections, the `keystore`, `truststore` and `key` passwords can be provided as a [Password Provider](../../operations/password-provider.md) or String password.|yes| +|`consumerProperties`|Map|A map of properties to be passed to the Kafka consumer. This must contain a property `bootstrap.servers` with a list of Kafka brokers in the form: `:,:,...`. For SSL connections, the `keystore`, `truststore` and `key` passwords can be provided as a [Password Provider](../../operations/password-provider.md) or String password.|yes| |`pollTimeout`|Long|The length of time to wait for the Kafka consumer to poll records, in milliseconds|no (default == 100)| |`replicas`|Integer|The number of replica sets, where 1 means a single set of tasks (no replication). Replica tasks will always be assigned to different workers to provide resiliency against process failure.|no (default == 1)| |`taskCount`|Integer|The maximum number of *reading* tasks in a *replica set*. This means that the maximum number of reading tasks will be `taskCount * replicas` and the total number of tasks (*reading* + *publishing*) will be higher than this. See [Capacity Planning](#capacity-planning) below for more details. The number of reading tasks will be less than `taskCount` if `taskCount > {numKafkaPartitions}`.|no (default == 1)| From 99e9b028d52622418974e4275c58c56e65c92c2c Mon Sep 17 00:00:00 2001 From: yuezhang Date: Tue, 17 Nov 2020 17:16:45 +0800 Subject: [PATCH 05/19] modify docs --- docs/development/extensions-core/kafka-ingestion.md | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/docs/development/extensions-core/kafka-ingestion.md b/docs/development/extensions-core/kafka-ingestion.md index 4f1d36c72f2c..c0296460171b 100644 --- a/docs/development/extensions-core/kafka-ingestion.md +++ b/docs/development/extensions-core/kafka-ingestion.md @@ -33,10 +33,11 @@ manage failures, and ensure that the scalability and replication requirements ar This service is provided in the `druid-kafka-indexing-service` core Apache Druid extension (see [Including Extensions](../../development/extensions.md#loading-extensions)). -> The Kafka indexing service supports transactional topics which were introduced in Kafka 0.11.x. These changes make the +> The Kafka indexing service supports transactional topics which were introduced in Kafka 0.11.x. It is the default behavior of Druid and make the > Kafka consumer that Druid uses incompatible with older brokers. Ensure that your Kafka brokers are version 0.11.x or -> better before using this functionality. Refer [Kafka upgrade guide](https://kafka.apache.org/documentation/#upgrade) -> if you are using older version of Kafka brokers. +> better before using this functionality. Refer [Kafka upgrade guide](https://kafka.apache.org/documentation/#upgrade). +> In addition, if you need Druid to consume lower version Kafka which is non-transactional like `0.10.x.x`, you could set `consumeTransactionally ` false in `KafkaSupervisorIOConfig`. + ## Tutorial From 808056c8163e7a03aedf227b4eb45fbb600f5589 Mon Sep 17 00:00:00 2001 From: yuezhang Date: Tue, 17 Nov 2020 18:26:22 +0800 Subject: [PATCH 06/19] fix kafkaindexTaskTest.java --- .../java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java index a3254234a715..156f0ed5ba64 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -2815,6 +2815,7 @@ public void testMultipleLinesJSONText() throws Exception new SeekableStreamStartSequenceNumbers<>(topic, ImmutableMap.of(0, 0L), ImmutableSet.of()), new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(0, 4L)), kafkaServer.consumerProperties(), + null, KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS, true, null, From 108d7b62e688a3ada0a37c73936ac4258f744d35 Mon Sep 17 00:00:00 2001 From: yuezhang Date: Thu, 19 Nov 2020 10:32:34 +0800 Subject: [PATCH 07/19] revert uncessary change --- .../org/apache/druid/indexing/kafka/KafkaRecordSupplier.java | 1 - .../druid/indexing/kafka/supervisor/KafkaSupervisorTest.java | 2 -- 2 files changed, 3 deletions(-) diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java index 9ad188f133cb..e2385c85c520 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java @@ -37,7 +37,6 @@ import org.apache.kafka.common.serialization.Deserializer; import javax.annotation.Nonnull; - import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.lang.reflect.Type; diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index 2619d43df15e..b968be430cb3 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -2446,9 +2446,7 @@ public void testCheckpointForInactiveTaskGroup() final TaskLocation location2 = new TaskLocation("testHost2", 145, -1); Collection workItems = new ArrayList<>(); workItems.add(new TestTaskRunnerWorkItem(id1, null, location1)); - workItems.add(new TestTaskRunnerWorkItem(id2, null, location2)); - workItems.add(new TestTaskRunnerWorkItem(id2, null, location2)); EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes(); From 827a43ac31b0498137260b10c0b10fd29496c9ed Mon Sep 17 00:00:00 2001 From: yuezhang Date: Mon, 23 Nov 2020 13:42:14 +0800 Subject: [PATCH 08/19] add more logs and modify docs --- .idea/misc.xml | 4 +- .../extensions-core/kafka-ingestion.md | 1 + .../indexing/kafka/KafkaConsumerConfigs.java | 5 +++ .../kafka/KafkaConsumerConfigsTest.java | 38 +++++++++++++++++++ 4 files changed, 46 insertions(+), 2 deletions(-) create mode 100644 extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaConsumerConfigsTest.java diff --git a/.idea/misc.xml b/.idea/misc.xml index bf2061d7392d..fe39ed623c9c 100644 --- a/.idea/misc.xml +++ b/.idea/misc.xml @@ -84,7 +84,7 @@ - + - + \ No newline at end of file diff --git a/docs/development/extensions-core/kafka-ingestion.md b/docs/development/extensions-core/kafka-ingestion.md index c0296460171b..e068fba2c6cb 100644 --- a/docs/development/extensions-core/kafka-ingestion.md +++ b/docs/development/extensions-core/kafka-ingestion.md @@ -37,6 +37,7 @@ This service is provided in the `druid-kafka-indexing-service` core Apache Druid > Kafka consumer that Druid uses incompatible with older brokers. Ensure that your Kafka brokers are version 0.11.x or > better before using this functionality. Refer [Kafka upgrade guide](https://kafka.apache.org/documentation/#upgrade). > In addition, if you need Druid to consume lower version Kafka which is non-transactional like `0.10.x.x`, you could set `consumeTransactionally ` false in `KafkaSupervisorIOConfig`. +> And make sure the offsets are sequential, because thers is no offset gap check in Druid anymore. ## Tutorial diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerConfigs.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerConfigs.java index fef904d7c896..ffdf3b8c45d9 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerConfigs.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerConfigs.java @@ -21,6 +21,7 @@ import org.apache.druid.common.utils.IdUtils; import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.logger.Logger; import java.util.HashMap; import java.util.Map; @@ -30,6 +31,7 @@ */ public class KafkaConsumerConfigs { + private static final Logger log = new Logger(KafkaConsumerConfigs.class); public static Map getConsumerProperties(Boolean consumeTransactionally) { @@ -39,7 +41,10 @@ public static Map getConsumerProperties(Boolean consumeTransacti props.put("auto.offset.reset", "none"); props.put("enable.auto.commit", "false"); if (consumeTransactionally) { + log.info("Set isolation.level read_committed, Druid Kafka Consumer will consume non-transactional and COMMITTED transactional records."); props.put("isolation.level", "read_committed"); + } else { + log.info("Ignore isolation.level, Druid Kafka Consumer will consume all records."); } return props; } diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaConsumerConfigsTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaConsumerConfigsTest.java new file mode 100644 index 000000000000..e6d29d983c3e --- /dev/null +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaConsumerConfigsTest.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.kafka; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.Map; + +public class KafkaConsumerConfigsTest +{ + @Test + public void testGetConsumerProperties() + { + Map consumerPropertiesWithCommitted = KafkaConsumerConfigs.getConsumerProperties(true); + Assert.assertTrue(String.valueOf(consumerPropertiesWithCommitted.get("isolation.level")).equalsIgnoreCase("read_committed")); + + Map consumerPropertiesWithUnCommitted = KafkaConsumerConfigs.getConsumerProperties(false); + Assert.assertNull(consumerPropertiesWithUnCommitted.get("isolation.level")); + } +} From 44a68ca3713388265a8d9e6ec79fdc7dabbf7b9a Mon Sep 17 00:00:00 2001 From: yuezhang Date: Mon, 23 Nov 2020 13:43:16 +0800 Subject: [PATCH 09/19] revert jdk version --- .idea/misc.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.idea/misc.xml b/.idea/misc.xml index fe39ed623c9c..61e7e5b21e77 100644 --- a/.idea/misc.xml +++ b/.idea/misc.xml @@ -84,7 +84,7 @@ - + \ No newline at end of file From 711f692705fb06389bfd38c20bd878eb07aee893 Mon Sep 17 00:00:00 2001 From: yuezhang Date: Mon, 23 Nov 2020 14:31:48 +0800 Subject: [PATCH 10/19] modify docs --- .idea/misc.xml | 2 +- docs/development/extensions-core/kafka-ingestion.md | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.idea/misc.xml b/.idea/misc.xml index 61e7e5b21e77..bf2061d7392d 100644 --- a/.idea/misc.xml +++ b/.idea/misc.xml @@ -87,4 +87,4 @@ - \ No newline at end of file + diff --git a/docs/development/extensions-core/kafka-ingestion.md b/docs/development/extensions-core/kafka-ingestion.md index e068fba2c6cb..fc34775c5fec 100644 --- a/docs/development/extensions-core/kafka-ingestion.md +++ b/docs/development/extensions-core/kafka-ingestion.md @@ -37,7 +37,7 @@ This service is provided in the `druid-kafka-indexing-service` core Apache Druid > Kafka consumer that Druid uses incompatible with older brokers. Ensure that your Kafka brokers are version 0.11.x or > better before using this functionality. Refer [Kafka upgrade guide](https://kafka.apache.org/documentation/#upgrade). > In addition, if you need Druid to consume lower version Kafka which is non-transactional like `0.10.x.x`, you could set `consumeTransactionally ` false in `KafkaSupervisorIOConfig`. -> And make sure the offsets are sequential, because thers is no offset gap check in Druid anymore. +> And make sure the offsets are sequential, because there is no offset gap check in Druid anymore. ## Tutorial From f4e31af7243a74205a9ab1c6b70d78dcb694e0df Mon Sep 17 00:00:00 2001 From: yuezhang Date: Tue, 1 Dec 2020 13:43:20 +0800 Subject: [PATCH 11/19] modify-kafka-version v2 --- docs/development/extensions-core/kafka-ingestion.md | 5 +++-- .../apache/druid/indexing/kafka/KafkaConsumerConfigs.java | 5 ++--- .../org/apache/druid/indexing/kafka/KafkaIndexTask.java | 6 ++++-- .../apache/druid/indexing/kafka/KafkaRecordSupplier.java | 2 +- .../indexing/kafka/supervisor/KafkaSupervisorTest.java | 4 ++-- .../org/apache/druid/indexing/kafka/test/TestBroker.java | 2 +- .../tests/indexer/AbstractKafkaIndexingServiceTest.java | 3 ++- 7 files changed, 15 insertions(+), 12 deletions(-) diff --git a/docs/development/extensions-core/kafka-ingestion.md b/docs/development/extensions-core/kafka-ingestion.md index 3eb3499fb9e7..1c71153ffbf4 100644 --- a/docs/development/extensions-core/kafka-ingestion.md +++ b/docs/development/extensions-core/kafka-ingestion.md @@ -33,10 +33,11 @@ manage failures, and ensure that the scalability and replication requirements ar This service is provided in the `druid-kafka-indexing-service` core Apache Druid extension (see [Including Extensions](../../development/extensions.md#loading-extensions)). -> The Kafka indexing service supports transactional topics which were introduced in Kafka 0.11.x. These changes make the +> The Kafka indexing service supports transactional topics which were introduced in Kafka 0.11.x. It is the default behavior of Druid and make the > Kafka consumer that Druid uses incompatible with older brokers. Ensure that your Kafka brokers are version 0.11.x or > better before using this functionality. Refer [Kafka upgrade guide](https://kafka.apache.org/documentation/#upgrade) > if you are using older version of Kafka brokers. +> In addition, users could set `isolation.level` false in `consumerProperties`, if don't need Druid to consume transactionally or need Druid to consume older versions of Kafka. ## Tutorial @@ -132,7 +133,7 @@ A sample supervisor spec is shown below: |-----|----|-----------|--------| |`topic`|String|The Kafka topic to read from. This must be a specific topic as topic patterns are not supported.|yes| |`inputFormat`|Object|[`inputFormat`](../../ingestion/data-formats.md#input-format) to specify how to parse input data. See [the below section](#specifying-data-format) for details about specifying the input format.|yes| -|`consumerProperties`|Map|A map of properties to be passed to the Kafka consumer. This must contain a property `bootstrap.servers` with a list of Kafka brokers in the form: `:,:,...`. For SSL connections, the `keystore`, `truststore` and `key` passwords can be provided as a [Password Provider](../../operations/password-provider.md) or String password.|yes| +|`consumerProperties`|Map|A map of properties to be passed to the Kafka consumer. This must contain a property `bootstrap.servers` with a list of Kafka brokers in the form: `:,:,...`. Users can set `isolation.level` false here if don't need Druid to consume transactionally or need Druid to consume older versions of Kafka. If For SSL connections, the `keystore`, `truststore` and `key` passwords can be provided as a [Password Provider](../../operations/password-provider.md) or String password.|yes| |`pollTimeout`|Long|The length of time to wait for the Kafka consumer to poll records, in milliseconds|no (default == 100)| |`replicas`|Integer|The number of replica sets, where 1 means a single set of tasks (no replication). Replica tasks will always be assigned to different workers to provide resiliency against process failure.|no (default == 1)| |`taskCount`|Integer|The maximum number of *reading* tasks in a *replica set*. This means that the maximum number of reading tasks will be `taskCount * replicas` and the total number of tasks (*reading* + *publishing*) will be higher than this. See [Capacity Planning](#capacity-planning) below for more details. The number of reading tasks will be less than `taskCount` if `taskCount > {numKafkaPartitions}`.|no (default == 1)| diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerConfigs.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerConfigs.java index e9c6e79358a3..7a00c7d720cc 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerConfigs.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerConfigs.java @@ -30,15 +30,14 @@ */ public class KafkaConsumerConfigs { - - public static Map getConsumerProperties() + public static Map getConsumerProperties(Map customerConsumerProperties) { final Map props = new HashMap<>(); props.put("metadata.max.age.ms", "10000"); props.put("group.id", StringUtils.format("kafka-supervisor-%s", IdUtils.getRandomId())); props.put("auto.offset.reset", "none"); props.put("enable.auto.commit", "false"); - props.put("isolation.level", "read_committed"); + props.put("isolation.level", customerConsumerProperties.getOrDefault("isolation.level", "read_committed")); return props; } diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java index fb063532356d..c3f218f17184 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java @@ -90,12 +90,14 @@ KafkaConsumer newConsumer() try { Thread.currentThread().setContextClassLoader(getClass().getClassLoader()); - final Map consumerConfigs = KafkaConsumerConfigs.getConsumerProperties(); + Map consumerProperties = ioConfig.getConsumerProperties(); + + final Map consumerConfigs = KafkaConsumerConfigs.getConsumerProperties(consumerProperties); final Properties props = new Properties(); KafkaRecordSupplier.addConsumerPropertiesFromConfig( props, configMapper, - ioConfig.getConsumerProperties() + consumerProperties ); props.putAll(consumerConfigs); diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java index 3ffe7fa83ec0..b862c84b1c0c 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java @@ -234,7 +234,7 @@ private static Deserializer getKafkaDeserializer(Properties properties, String k private static KafkaConsumer getKafkaConsumer(ObjectMapper sortingMapper, Map consumerProperties) { - final Map consumerConfigs = KafkaConsumerConfigs.getConsumerProperties(); + final Map consumerConfigs = KafkaConsumerConfigs.getConsumerProperties(consumerProperties); final Properties props = new Properties(); addConsumerPropertiesFromConfig(props, sortingMapper, consumerProperties); props.putAll(consumerConfigs); diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index 251d8376a690..73b609db5407 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -3365,7 +3365,7 @@ private TestableKafkaSupervisor getTestableSupervisor( String kafkaHost ) { - final Map consumerProperties = KafkaConsumerConfigs.getConsumerProperties(); + final Map consumerProperties = KafkaConsumerConfigs.getConsumerProperties(new HashMap<>()); consumerProperties.put("myCustomKey", "myCustomValue"); consumerProperties.put("bootstrap.servers", kafkaHost); KafkaSupervisorIOConfig kafkaSupervisorIOConfig = new KafkaSupervisorIOConfig( @@ -3824,7 +3824,7 @@ public TestableKafkaSupervisor( @Override protected RecordSupplier setupRecordSupplier() { - final Map consumerConfigs = KafkaConsumerConfigs.getConsumerProperties(); + final Map consumerConfigs = KafkaConsumerConfigs.getConsumerProperties(new HashMap<>()); consumerConfigs.put("metadata.max.age.ms", "1"); final Properties props = new Properties(); KafkaRecordSupplier.addConsumerPropertiesFromConfig(props, sortingMapper, consumerProperties); diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestBroker.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestBroker.java index 9d3f7c500c44..61a9b001f3db 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestBroker.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestBroker.java @@ -140,7 +140,7 @@ void commonClientProperties(Map props) public Map consumerProperties() { - final Map props = KafkaConsumerConfigs.getConsumerProperties(); + final Map props = KafkaConsumerConfigs.getConsumerProperties(new HashMap<>()); props.put("bootstrap.servers", StringUtils.format("localhost:%d", getPort())); return props; } diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexingServiceTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexingServiceTest.java index 204b6ef7259e..5eba1abf1017 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexingServiceTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexingServiceTest.java @@ -29,6 +29,7 @@ import org.apache.druid.testing.utils.StreamAdminClient; import org.apache.druid.testing.utils.StreamEventWriter; +import java.util.HashMap; import java.util.Map; import java.util.Properties; import java.util.function.Function; @@ -56,7 +57,7 @@ Function generateStreamIngestionPropsTransform( IntegrationTestingConfig config ) { - final Map consumerConfigs = KafkaConsumerConfigs.getConsumerProperties(); + final Map consumerConfigs = KafkaConsumerConfigs.getConsumerProperties(new HashMap<>()); final Properties consumerProperties = new Properties(); consumerProperties.putAll(consumerConfigs); consumerProperties.setProperty("bootstrap.servers", config.getKafkaInternalHost()); From 5f411718ca40901e8e9b5aa3a346be7002e92fd6 Mon Sep 17 00:00:00 2001 From: yuezhang Date: Tue, 1 Dec 2020 14:47:54 +0800 Subject: [PATCH 12/19] modify docs --- docs/development/extensions-core/kafka-ingestion.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/development/extensions-core/kafka-ingestion.md b/docs/development/extensions-core/kafka-ingestion.md index 1c71153ffbf4..f93b9469d322 100644 --- a/docs/development/extensions-core/kafka-ingestion.md +++ b/docs/development/extensions-core/kafka-ingestion.md @@ -37,7 +37,7 @@ This service is provided in the `druid-kafka-indexing-service` core Apache Druid > Kafka consumer that Druid uses incompatible with older brokers. Ensure that your Kafka brokers are version 0.11.x or > better before using this functionality. Refer [Kafka upgrade guide](https://kafka.apache.org/documentation/#upgrade) > if you are using older version of Kafka brokers. -> In addition, users could set `isolation.level` false in `consumerProperties`, if don't need Druid to consume transactionally or need Druid to consume older versions of Kafka. +> In addition, users could set `isolation.level` false in `consumerProperties`, if don't need Druid to consume transactional topics or need Druid to consume older versions of Kafka. ## Tutorial @@ -133,7 +133,7 @@ A sample supervisor spec is shown below: |-----|----|-----------|--------| |`topic`|String|The Kafka topic to read from. This must be a specific topic as topic patterns are not supported.|yes| |`inputFormat`|Object|[`inputFormat`](../../ingestion/data-formats.md#input-format) to specify how to parse input data. See [the below section](#specifying-data-format) for details about specifying the input format.|yes| -|`consumerProperties`|Map|A map of properties to be passed to the Kafka consumer. This must contain a property `bootstrap.servers` with a list of Kafka brokers in the form: `:,:,...`. Users can set `isolation.level` false here if don't need Druid to consume transactionally or need Druid to consume older versions of Kafka. If For SSL connections, the `keystore`, `truststore` and `key` passwords can be provided as a [Password Provider](../../operations/password-provider.md) or String password.|yes| +|`consumerProperties`|Map|A map of properties to be passed to the Kafka consumer. This must contain a property `bootstrap.servers` with a list of Kafka brokers in the form: `:,:,...`. Users can set `isolation.level` false here if don't need Druid to consume transactional topics or need Druid to consume older versions of Kafka. If For SSL connections, the `keystore`, `truststore` and `key` passwords can be provided as a [Password Provider](../../operations/password-provider.md) or String password.|yes| |`pollTimeout`|Long|The length of time to wait for the Kafka consumer to poll records, in milliseconds|no (default == 100)| |`replicas`|Integer|The number of replica sets, where 1 means a single set of tasks (no replication). Replica tasks will always be assigned to different workers to provide resiliency against process failure.|no (default == 1)| |`taskCount`|Integer|The maximum number of *reading* tasks in a *replica set*. This means that the maximum number of reading tasks will be `taskCount * replicas` and the total number of tasks (*reading* + *publishing*) will be higher than this. See [Capacity Planning](#capacity-planning) below for more details. The number of reading tasks will be less than `taskCount` if `taskCount > {numKafkaPartitions}`.|no (default == 1)| From 57761199f7dac7b35d20ea6b3a07f4fd831b597d Mon Sep 17 00:00:00 2001 From: yuezhang Date: Tue, 1 Dec 2020 17:47:19 +0800 Subject: [PATCH 13/19] modify docs --- docs/development/extensions-core/kafka-ingestion.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/development/extensions-core/kafka-ingestion.md b/docs/development/extensions-core/kafka-ingestion.md index f93b9469d322..b9411c8070a7 100644 --- a/docs/development/extensions-core/kafka-ingestion.md +++ b/docs/development/extensions-core/kafka-ingestion.md @@ -37,7 +37,7 @@ This service is provided in the `druid-kafka-indexing-service` core Apache Druid > Kafka consumer that Druid uses incompatible with older brokers. Ensure that your Kafka brokers are version 0.11.x or > better before using this functionality. Refer [Kafka upgrade guide](https://kafka.apache.org/documentation/#upgrade) > if you are using older version of Kafka brokers. -> In addition, users could set `isolation.level` false in `consumerProperties`, if don't need Druid to consume transactional topics or need Druid to consume older versions of Kafka. +> In addition, users could set `isolation.level` false in `consumerProperties`, if don't need Druid to consume transactional topics or need Druid to consume older versions of Kafka (Make sure offsets are sequential, since there is no offset gap check in Druid anymore). ## Tutorial From 061cac5f4f8172a0558da0b162fc2babacff2290 Mon Sep 17 00:00:00 2001 From: yuezhang Date: Tue, 1 Dec 2020 17:50:56 +0800 Subject: [PATCH 14/19] modify docs --- docs/development/extensions-core/kafka-ingestion.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/docs/development/extensions-core/kafka-ingestion.md b/docs/development/extensions-core/kafka-ingestion.md index b9411c8070a7..a37f8e00db87 100644 --- a/docs/development/extensions-core/kafka-ingestion.md +++ b/docs/development/extensions-core/kafka-ingestion.md @@ -37,7 +37,8 @@ This service is provided in the `druid-kafka-indexing-service` core Apache Druid > Kafka consumer that Druid uses incompatible with older brokers. Ensure that your Kafka brokers are version 0.11.x or > better before using this functionality. Refer [Kafka upgrade guide](https://kafka.apache.org/documentation/#upgrade) > if you are using older version of Kafka brokers. -> In addition, users could set `isolation.level` false in `consumerProperties`, if don't need Druid to consume transactional topics or need Druid to consume older versions of Kafka (Make sure offsets are sequential, since there is no offset gap check in Druid anymore). +> In addition, users could set `isolation.level` false in `consumerProperties`, if don't need Druid to consume transactional topics or need Druid to consume older versions of Kafka. +> Make sure offsets are sequential, since there is no offset gap check in Druid anymore. ## Tutorial From 8201b0b2ca8b74d2a209186708a9b55aa5cfcc57 Mon Sep 17 00:00:00 2001 From: yuezhang Date: Tue, 1 Dec 2020 21:08:20 +0800 Subject: [PATCH 15/19] modify docs --- docs/development/extensions-core/kafka-ingestion.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/development/extensions-core/kafka-ingestion.md b/docs/development/extensions-core/kafka-ingestion.md index a37f8e00db87..87e16ece0c28 100644 --- a/docs/development/extensions-core/kafka-ingestion.md +++ b/docs/development/extensions-core/kafka-ingestion.md @@ -37,7 +37,7 @@ This service is provided in the `druid-kafka-indexing-service` core Apache Druid > Kafka consumer that Druid uses incompatible with older brokers. Ensure that your Kafka brokers are version 0.11.x or > better before using this functionality. Refer [Kafka upgrade guide](https://kafka.apache.org/documentation/#upgrade) > if you are using older version of Kafka brokers. -> In addition, users could set `isolation.level` false in `consumerProperties`, if don't need Druid to consume transactional topics or need Druid to consume older versions of Kafka. +> In addition, users could set `isolation.level` `read_uncommitted` in `consumerProperties`, if don't need Druid to consume transactional topics or need Druid to consume older versions of Kafka. > Make sure offsets are sequential, since there is no offset gap check in Druid anymore. ## Tutorial @@ -134,7 +134,7 @@ A sample supervisor spec is shown below: |-----|----|-----------|--------| |`topic`|String|The Kafka topic to read from. This must be a specific topic as topic patterns are not supported.|yes| |`inputFormat`|Object|[`inputFormat`](../../ingestion/data-formats.md#input-format) to specify how to parse input data. See [the below section](#specifying-data-format) for details about specifying the input format.|yes| -|`consumerProperties`|Map|A map of properties to be passed to the Kafka consumer. This must contain a property `bootstrap.servers` with a list of Kafka brokers in the form: `:,:,...`. Users can set `isolation.level` false here if don't need Druid to consume transactional topics or need Druid to consume older versions of Kafka. If For SSL connections, the `keystore`, `truststore` and `key` passwords can be provided as a [Password Provider](../../operations/password-provider.md) or String password.|yes| +|`consumerProperties`|Map|A map of properties to be passed to the Kafka consumer. This must contain a property `bootstrap.servers` with a list of Kafka brokers in the form: `:,:,...`. Users can set `isolation.level` `read_uncommitted` here if don't need Druid to consume transactional topics or need Druid to consume older versions of Kafka. If For SSL connections, the `keystore`, `truststore` and `key` passwords can be provided as a [Password Provider](../../operations/password-provider.md) or String password.|yes| |`pollTimeout`|Long|The length of time to wait for the Kafka consumer to poll records, in milliseconds|no (default == 100)| |`replicas`|Integer|The number of replica sets, where 1 means a single set of tasks (no replication). Replica tasks will always be assigned to different workers to provide resiliency against process failure.|no (default == 1)| |`taskCount`|Integer|The maximum number of *reading* tasks in a *replica set*. This means that the maximum number of reading tasks will be `taskCount * replicas` and the total number of tasks (*reading* + *publishing*) will be higher than this. See [Capacity Planning](#capacity-planning) below for more details. The number of reading tasks will be less than `taskCount` if `taskCount > {numKafkaPartitions}`.|no (default == 1)| From 69b4d409de14cb83305e5d94e87751542256aa12 Mon Sep 17 00:00:00 2001 From: yuezhang Date: Tue, 1 Dec 2020 21:21:59 +0800 Subject: [PATCH 16/19] modify docs --- docs/development/extensions-core/kafka-ingestion.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/development/extensions-core/kafka-ingestion.md b/docs/development/extensions-core/kafka-ingestion.md index 87e16ece0c28..c128c1b2c476 100644 --- a/docs/development/extensions-core/kafka-ingestion.md +++ b/docs/development/extensions-core/kafka-ingestion.md @@ -134,7 +134,7 @@ A sample supervisor spec is shown below: |-----|----|-----------|--------| |`topic`|String|The Kafka topic to read from. This must be a specific topic as topic patterns are not supported.|yes| |`inputFormat`|Object|[`inputFormat`](../../ingestion/data-formats.md#input-format) to specify how to parse input data. See [the below section](#specifying-data-format) for details about specifying the input format.|yes| -|`consumerProperties`|Map|A map of properties to be passed to the Kafka consumer. This must contain a property `bootstrap.servers` with a list of Kafka brokers in the form: `:,:,...`. Users can set `isolation.level` `read_uncommitted` here if don't need Druid to consume transactional topics or need Druid to consume older versions of Kafka. If For SSL connections, the `keystore`, `truststore` and `key` passwords can be provided as a [Password Provider](../../operations/password-provider.md) or String password.|yes| +|`consumerProperties`|Map|A map of properties to be passed to the Kafka consumer. This must contain a property `bootstrap.servers` with a list of Kafka brokers in the form: `:,:,...`. Users can set `isolation.level` `read_uncommitted` here if don't need Druid to consume transactional topics or need Druid to consume older versions of Kafka. For SSL connections, the `keystore`, `truststore` and `key` passwords can be provided as a [Password Provider](../../operations/password-provider.md) or String password.|yes| |`pollTimeout`|Long|The length of time to wait for the Kafka consumer to poll records, in milliseconds|no (default == 100)| |`replicas`|Integer|The number of replica sets, where 1 means a single set of tasks (no replication). Replica tasks will always be assigned to different workers to provide resiliency against process failure.|no (default == 1)| |`taskCount`|Integer|The maximum number of *reading* tasks in a *replica set*. This means that the maximum number of reading tasks will be `taskCount * replicas` and the total number of tasks (*reading* + *publishing*) will be higher than this. See [Capacity Planning](#capacity-planning) below for more details. The number of reading tasks will be less than `taskCount` if `taskCount > {numKafkaPartitions}`.|no (default == 1)| From cc59d8bbe0175faa269a4bdc06a24f13f6dd220c Mon Sep 17 00:00:00 2001 From: yuezhang Date: Thu, 3 Dec 2020 12:45:02 +0800 Subject: [PATCH 17/19] done --- .../apache/druid/indexing/kafka/KafkaConsumerConfigs.java | 3 +-- .../org/apache/druid/indexing/kafka/KafkaIndexTask.java | 6 ++---- .../apache/druid/indexing/kafka/KafkaRecordSupplier.java | 2 +- .../indexing/kafka/supervisor/KafkaSupervisorIOConfig.java | 1 + .../indexing/kafka/supervisor/KafkaSupervisorTest.java | 4 ++-- .../org/apache/druid/indexing/kafka/test/TestBroker.java | 2 +- .../tests/indexer/AbstractKafkaIndexingServiceTest.java | 2 +- 7 files changed, 9 insertions(+), 11 deletions(-) diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerConfigs.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerConfigs.java index 27bd9e91c6ae..365be19cc03c 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerConfigs.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerConfigs.java @@ -31,14 +31,13 @@ public class KafkaConsumerConfigs { - public static Map getConsumerProperties(Map customerConsumerProperties) + public static Map getConsumerProperties() { final Map props = new HashMap<>(); props.put("metadata.max.age.ms", "10000"); props.put("group.id", StringUtils.format("kafka-supervisor-%s", IdUtils.getRandomId())); props.put("auto.offset.reset", "none"); props.put("enable.auto.commit", "false"); - props.put("isolation.level", customerConsumerProperties.getOrDefault("isolation.level", "read_committed")); return props; } diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java index c3f218f17184..fb063532356d 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java @@ -90,14 +90,12 @@ KafkaConsumer newConsumer() try { Thread.currentThread().setContextClassLoader(getClass().getClassLoader()); - Map consumerProperties = ioConfig.getConsumerProperties(); - - final Map consumerConfigs = KafkaConsumerConfigs.getConsumerProperties(consumerProperties); + final Map consumerConfigs = KafkaConsumerConfigs.getConsumerProperties(); final Properties props = new Properties(); KafkaRecordSupplier.addConsumerPropertiesFromConfig( props, configMapper, - consumerProperties + ioConfig.getConsumerProperties() ); props.putAll(consumerConfigs); diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java index b862c84b1c0c..3ffe7fa83ec0 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java @@ -234,7 +234,7 @@ private static Deserializer getKafkaDeserializer(Properties properties, String k private static KafkaConsumer getKafkaConsumer(ObjectMapper sortingMapper, Map consumerProperties) { - final Map consumerConfigs = KafkaConsumerConfigs.getConsumerProperties(consumerProperties); + final Map consumerConfigs = KafkaConsumerConfigs.getConsumerProperties(); final Properties props = new Properties(); addConsumerPropertiesFromConfig(props, sortingMapper, consumerProperties); props.putAll(consumerConfigs); diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java index a1360b5c63f3..42b1db0d6901 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java @@ -76,6 +76,7 @@ public KafkaSupervisorIOConfig( ); this.consumerProperties = Preconditions.checkNotNull(consumerProperties, "consumerProperties"); + consumerProperties.putIfAbsent("isolation.level", "read_committed"); Preconditions.checkNotNull( consumerProperties.get(BOOTSTRAP_SERVERS_KEY), StringUtils.format("consumerProperties must contain entry for [%s]", BOOTSTRAP_SERVERS_KEY) diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index 73b609db5407..251d8376a690 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -3365,7 +3365,7 @@ private TestableKafkaSupervisor getTestableSupervisor( String kafkaHost ) { - final Map consumerProperties = KafkaConsumerConfigs.getConsumerProperties(new HashMap<>()); + final Map consumerProperties = KafkaConsumerConfigs.getConsumerProperties(); consumerProperties.put("myCustomKey", "myCustomValue"); consumerProperties.put("bootstrap.servers", kafkaHost); KafkaSupervisorIOConfig kafkaSupervisorIOConfig = new KafkaSupervisorIOConfig( @@ -3824,7 +3824,7 @@ public TestableKafkaSupervisor( @Override protected RecordSupplier setupRecordSupplier() { - final Map consumerConfigs = KafkaConsumerConfigs.getConsumerProperties(new HashMap<>()); + final Map consumerConfigs = KafkaConsumerConfigs.getConsumerProperties(); consumerConfigs.put("metadata.max.age.ms", "1"); final Properties props = new Properties(); KafkaRecordSupplier.addConsumerPropertiesFromConfig(props, sortingMapper, consumerProperties); diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestBroker.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestBroker.java index 61a9b001f3db..9d3f7c500c44 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestBroker.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestBroker.java @@ -140,7 +140,7 @@ void commonClientProperties(Map props) public Map consumerProperties() { - final Map props = KafkaConsumerConfigs.getConsumerProperties(new HashMap<>()); + final Map props = KafkaConsumerConfigs.getConsumerProperties(); props.put("bootstrap.servers", StringUtils.format("localhost:%d", getPort())); return props; } diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexingServiceTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexingServiceTest.java index 5eba1abf1017..90b7f05415d5 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexingServiceTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexingServiceTest.java @@ -57,7 +57,7 @@ Function generateStreamIngestionPropsTransform( IntegrationTestingConfig config ) { - final Map consumerConfigs = KafkaConsumerConfigs.getConsumerProperties(new HashMap<>()); + final Map consumerConfigs = KafkaConsumerConfigs.getConsumerProperties(); final Properties consumerProperties = new Properties(); consumerProperties.putAll(consumerConfigs); consumerProperties.setProperty("bootstrap.servers", config.getKafkaInternalHost()); From 8197b13bd2e66770297f0e6616073973cc9e9abc Mon Sep 17 00:00:00 2001 From: yuezhang Date: Thu, 3 Dec 2020 12:48:25 +0800 Subject: [PATCH 18/19] remove useless import --- .../druid/tests/indexer/AbstractKafkaIndexingServiceTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexingServiceTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexingServiceTest.java index 90b7f05415d5..204b6ef7259e 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexingServiceTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexingServiceTest.java @@ -29,7 +29,6 @@ import org.apache.druid.testing.utils.StreamAdminClient; import org.apache.druid.testing.utils.StreamEventWriter; -import java.util.HashMap; import java.util.Map; import java.util.Properties; import java.util.function.Function; From 2a457d59c202e1a446ddbc53f198849d47cf59fb Mon Sep 17 00:00:00 2001 From: yuezhang Date: Thu, 3 Dec 2020 22:09:32 +0800 Subject: [PATCH 19/19] change code and add UT --- .../druid/indexing/kafka/KafkaIndexTask.java | 1 + .../indexing/kafka/KafkaRecordSupplier.java | 1 + .../supervisor/KafkaSupervisorIOConfig.java | 1 - .../indexing/kafka/KafkaIndexTaskTest.java | 41 +++++++++++++++++++ 4 files changed, 43 insertions(+), 1 deletion(-) diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java index fb063532356d..7e09acf8cb89 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java @@ -97,6 +97,7 @@ KafkaConsumer newConsumer() configMapper, ioConfig.getConsumerProperties() ); + props.putIfAbsent("isolation.level", "read_committed"); props.putAll(consumerConfigs); return new KafkaConsumer<>(props); diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java index 3ffe7fa83ec0..3c9485365bf7 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java @@ -237,6 +237,7 @@ private static KafkaConsumer getKafkaConsumer(ObjectMapper sorti final Map consumerConfigs = KafkaConsumerConfigs.getConsumerProperties(); final Properties props = new Properties(); addConsumerPropertiesFromConfig(props, sortingMapper, consumerProperties); + props.putIfAbsent("isolation.level", "read_committed"); props.putAll(consumerConfigs); ClassLoader currCtxCl = Thread.currentThread().getContextClassLoader(); diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java index 42b1db0d6901..a1360b5c63f3 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java @@ -76,7 +76,6 @@ public KafkaSupervisorIOConfig( ); this.consumerProperties = Preconditions.checkNotNull(consumerProperties, "consumerProperties"); - consumerProperties.putIfAbsent("isolation.level", "read_committed"); Preconditions.checkNotNull( consumerProperties.get(BOOTSTRAP_SERVERS_KEY), StringUtils.format("consumerProperties must contain entry for [%s]", BOOTSTRAP_SERVERS_KEY) diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java index aa4385e67335..fa86c49933a6 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -2329,6 +2329,47 @@ public void testRunTransactionModeRollback() throws Exception ); } + @Test(timeout = 60_000L) + public void testRunUnTransactionMode() throws Exception + { + Map configs = kafkaServer.consumerProperties(); + configs.put("isolation.level", "read_uncommitted"); + final KafkaIndexTask task = createTask( + null, + new KafkaIndexTaskIOConfig( + 0, + "sequence0", + new SeekableStreamStartSequenceNumbers<>(topic, ImmutableMap.of(0, 0L), ImmutableSet.of()), + new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(0, 13L)), + configs, + KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS, + true, + null, + null, + INPUT_FORMAT + ) + ); + + final ListenableFuture future = runTask(task); + + // Insert 2 records initially + try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { + kafkaProducer.initTransactions(); + kafkaProducer.beginTransaction(); + for (ProducerRecord record : Iterables.limit(records, 2)) { + kafkaProducer.send(record).get(); + } + kafkaProducer.flush(); + kafkaProducer.abortTransaction(); + } + + while (countEvents(task) != 2) { + Thread.sleep(25); + } + + Assert.assertEquals(2, countEvents(task)); + } + @Test(timeout = 60_000L) public void testCanStartFromLaterThanEarliestOffset() throws Exception {